From 7f0e2c67e0b0d38f83e8629093f27557413e28fe Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 8 May 2019 01:00:10 +0530 Subject: [PATCH] HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749). Contributed by Shashikant Banerjee --- .../hadoop/hdds/scm/XceiverClientRatis.java | 16 +- .../hdds/scm/client/HddsClientUtils.java | 70 ++- .../hdds/scm/storage/BlockOutputStream.java | 13 +- .../hdds/scm/storage/CommitWatcher.java | 1 - .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 +- .../src/main/resources/ozone-default.xml | 4 +- hadoop-hdds/pom.xml | 2 +- .../hadoop/ozone/client/OzoneClientUtils.java | 38 -- .../client/io/BlockOutputStreamEntry.java | 7 + .../client/io/BlockOutputStreamEntryPool.java | 344 ++++++++++++ .../ozone/client/io/KeyOutputStream.java | 380 +++---------- .../TestBlockOutputStreamWithFailures.java | 356 ++++++------- .../TestCloseContainerHandlingByClient.java | 2 +- .../TestOzoneClientRetriesOnException.java | 8 +- .../ozone/client/rpc/TestWatchForCommit.java | 501 ++++++++++++++++++ .../ozone/container/ContainerTestHelper.java | 27 +- hadoop-ozone/pom.xml | 2 +- 17 files changed, 1196 insertions(+), 579 deletions(-) create mode 100644 hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 899bba8aad..efd82bce7b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -29,6 +29,7 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf @@ -69,7 +70,8 @@ * The underlying RPC mechanism can be chosen via the constructor. */ public final class XceiverClientRatis extends XceiverClientSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); + public static final Logger LOG = + LoggerFactory.getLogger(XceiverClientRatis.class); public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, @@ -248,13 +250,17 @@ public XceiverClientReply watchForCommit(long index, long timeout) return clientReply; } LOG.debug("commit index : {} watch timeout : {}", index, timeout); - CompletableFuture replyFuture = getClient() - .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); RaftClientReply reply; try { + CompletableFuture replyFuture = getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); replyFuture.get(timeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException toe) { - LOG.warn("3 way commit failed ", toe); + } catch (Exception e) { + Throwable t = HddsClientUtils.checkForException(e); + LOG.warn("3 way commit failed ", e); + if (t instanceof GroupMismatchException) { + throw e; + } reply = getClient() .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java index be9bc9320f..97b8f95b87 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java @@ -28,8 +28,11 @@ import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -40,6 +43,10 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.NotReplicatedException; +import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +57,12 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.TimeoutException; /** * Utility methods for Ozone and Container Clients. @@ -72,6 +83,18 @@ public final class HddsClientUtils { private HddsClientUtils() { } + private static final List> EXCEPTION_LIST = + new ArrayList>() {{ + add(TimeoutException.class); + add(ContainerNotOpenException.class); + add(RaftRetryFailureException.class); + add(AlreadyClosedException.class); + add(GroupMismatchException.class); + // Not Replicated Exception will be thrown if watch For commit + // does not succeed + add(NotReplicatedException.class); + }}; + /** * Date format that used in ozone. Here the format is thread safe to use. */ @@ -290,4 +313,49 @@ public static SCMSecurityProtocol getScmSecurityClient( Client.getRpcTimeout(conf))); return scmSecurityClient; } + + public static Throwable checkForException(Exception e) throws IOException { + Throwable t = e; + while (t != null) { + for (Class cls : getExceptionList()) { + if (cls.isInstance(t)) { + return t; + } + } + t = t.getCause(); + } + + throw e instanceof IOException ? (IOException)e : new IOException(e); + } + + public static RetryPolicy createRetryPolicy(int maxRetryCount, + long retryInterval) { + // retry with fixed sleep between retries + return RetryPolicies.retryUpToMaximumCountWithFixedSleep( + maxRetryCount, retryInterval, TimeUnit.MILLISECONDS); + } + + public static Map, + RetryPolicy> getRetryPolicyByException(int maxRetryCount, + long retryInterval) { + Map, RetryPolicy> policyMap = new HashMap<>(); + for (Class ex : EXCEPTION_LIST) { + if (ex == TimeoutException.class + || ex == RaftRetryFailureException.class) { + // retry without sleep + policyMap.put(ex, createRetryPolicy(maxRetryCount, 0)); + } else { + // retry with fixed sleep between retries + policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval)); + } + } + // Default retry policy + policyMap + .put(Exception.class, createRetryPolicy(maxRetryCount, retryInterval)); + return policyMap; + } + + public static List> getExceptionList() { + return EXCEPTION_LIST; + } } \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 139f49404c..5ca32630c8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream { public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStream.class); - private BlockID blockID; + private volatile BlockID blockID; private final String key; private final String traceID; private final BlockData.Builder containerBlockData; @@ -574,7 +574,7 @@ public void cleanup(boolean invalidateClient) { * @throws IOException if stream is closed */ private void checkOpen() throws IOException { - if (xceiverClient == null) { + if (isClosed()) { throw new IOException("BlockOutputStream has been closed."); } else if (getIoException() != null) { adjustBuffersOnException(); @@ -582,6 +582,10 @@ private void checkOpen() throws IOException { } } + public boolean isClosed() { + return xceiverClient == null; + } + /** * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. @@ -635,4 +639,9 @@ private void writeChunkToContainer(ByteBuffer chunk) throws IOException { + " length " + effectiveChunkSize); containerBlockData.addChunks(chunkInfo); } + + @VisibleForTesting + public void setXceiverClient(XceiverClientSpi xceiverClient) { + this.xceiverClient = xceiverClient; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index aeac941af9..d4606b514c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -188,7 +188,6 @@ void releaseBuffersOnException() { */ public XceiverClientReply watchForCommit(long commitIndex) throws IOException { - Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); long index; try { XceiverClientReply reply = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index ccfd7ca1dd..7dfbc464f7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -121,12 +121,12 @@ public final class ScmConfigKeys { TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY = "dfs.ratis.client.request.max.retries"; - public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20; + public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180; public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY = "dfs.ratis.client.request.retry.interval"; public static final TimeDuration DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT = - TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); + TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY = "dfs.ratis.server.retry-cache.timeout.duration"; public static final TimeDuration diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 6aafb58a3b..ddb97ddf23 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -237,13 +237,13 @@ dfs.ratis.client.request.max.retries - 20 + 180 OZONE, RATIS, MANAGEMENT Number of retries for ratis client request. dfs.ratis.client.request.retry.interval - 500ms + 1000ms OZONE, RATIS, MANAGEMENT Interval between successive retries for a ratis client request. diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index f77fe9db9f..c063cbf5db 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -47,7 +47,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> 0.5.0-SNAPSHOT - 0.3.0 + 0.4.0-fe2b15d-SNAPSHOT 1.60 diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 83632b5c67..38c0ba7684 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -18,15 +18,11 @@ package org.apache.hadoop.ozone.client; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.OzoneConsts; @@ -36,23 +32,11 @@ import org.apache.hadoop.ozone.client.rest.response.KeyLocation; import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; import org.apache.hadoop.ozone.client.rest.response.VolumeOwner; -import org.apache.ratis.protocol.AlreadyClosedException; -import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.RaftRetryFailureException; /** A utility class for OzoneClient. */ public final class OzoneClientUtils { private OzoneClientUtils() {} - - private static final List> EXCEPTION_LIST = - new ArrayList>() {{ - add(TimeoutException.class); - add(ContainerNotOpenException.class); - add(RaftRetryFailureException.class); - add(AlreadyClosedException.class); - add(GroupMismatchException.class); - }}; /** * Returns a BucketInfo object constructed using fields of the input * OzoneBucket object. @@ -141,26 +125,4 @@ public static RetryPolicy createRetryPolicy(int maxRetryCount, maxRetryCount, retryInterval, TimeUnit.MILLISECONDS); } - public static List> getExceptionList() { - return EXCEPTION_LIST; - } - - public static Map, RetryPolicy> - getRetryPolicyByException(int maxRetryCount, long retryInterval) { - Map, RetryPolicy> policyMap = new HashMap<>(); - for (Class ex : EXCEPTION_LIST) { - if (ex == TimeoutException.class || - ex == RaftRetryFailureException.class) { - // retry without sleep - policyMap.put(ex, createRetryPolicy(maxRetryCount, 0)); - } else { - // retry with fixed sleep between retries - policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval)); - } - } - // Default retry policy - policyMap.put(Exception.class, createRetryPolicy( - maxRetryCount, retryInterval)); - return policyMap; - } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index fb700da001..e11eab90e4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -149,6 +149,13 @@ public void close() throws IOException { } } + boolean isClosed() { + if (outputStream != null) { + return ((BlockOutputStream) outputStream).isClosed(); + } + return false; + } + long getTotalAckDataLength() { if (outputStream != null) { BlockOutputStream out = (BlockOutputStream) this.outputStream; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java new file mode 100644 index 0000000000..7a8af65b85 --- /dev/null +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -0,0 +1,344 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.helpers.*; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +/** + * This class manages the stream entries list and handles block allocation + * from OzoneManager. + */ +public class BlockOutputStreamEntryPool { + + public static final Logger LOG = + LoggerFactory.getLogger(BlockOutputStreamEntryPool.class); + + private final List streamEntries; + private int currentStreamIndex; + private final OzoneManagerProtocol omClient; + private final OmKeyArgs keyArgs; + private final XceiverClientManager xceiverClientManager; + private final int chunkSize; + private final String requestID; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; + private final int bytesPerChecksum; + private final ContainerProtos.ChecksumType checksumType; + private final BufferPool bufferPool; + private OmMultipartCommitUploadPartInfo commitUploadPartInfo; + private final long openID; + private ExcludeList excludeList; + + @SuppressWarnings("parameternumber") + public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, + int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, + HddsProtos.ReplicationType type, long bufferFlushSize, long bufferMaxSize, + long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, + int bytesPerChecksum, String uploadID, int partNumber, + boolean isMultipart, OmKeyInfo info, + XceiverClientManager xceiverClientManager, long openID) { + streamEntries = new ArrayList<>(); + currentStreamIndex = 0; + this.omClient = omClient; + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setType(type).setFactor(factor).setDataSize(info.getDataSize()) + .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) + .setMultipartUploadPartNumber(partNumber).build(); + this.xceiverClientManager = xceiverClientManager; + this.chunkSize = chunkSize; + this.requestID = requestId; + this.streamBufferFlushSize = bufferFlushSize; + this.streamBufferMaxSize = bufferMaxSize; + this.blockSize = size; + this.watchTimeout = watchTimeout; + this.bytesPerChecksum = bytesPerChecksum; + this.checksumType = checksumType; + this.openID = openID; + this.excludeList = new ExcludeList(); + + Preconditions.checkState(chunkSize > 0); + Preconditions.checkState(streamBufferFlushSize > 0); + Preconditions.checkState(streamBufferMaxSize > 0); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); + Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); + Preconditions.checkState(blockSize % streamBufferMaxSize == 0); + this.bufferPool = + new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize); + } + + public BlockOutputStreamEntryPool() { + streamEntries = new ArrayList<>(); + omClient = null; + keyArgs = null; + xceiverClientManager = null; + chunkSize = 0; + requestID = null; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + bufferPool = new BufferPool(chunkSize, 1); + watchTimeout = 0; + blockSize = 0; + this.checksumType = ContainerProtos.ChecksumType.valueOf( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + this.bytesPerChecksum = OzoneConfigKeys + .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB + currentStreamIndex = 0; + openID = -1; + } + + /** + * When a key is opened, it is possible that there are some blocks already + * allocated to it for this open session. In this case, to make use of these + * blocks, we need to add these blocks to stream entries. But, a key's version + * also includes blocks from previous versions, we need to avoid adding these + * old blocks to stream entries, because these old blocks should not be picked + * for write. To do this, the following method checks that, only those + * blocks created in this particular open version are added to stream entries. + * + * @param version the set of blocks that are pre-allocated. + * @param openVersion the version corresponding to the pre-allocation. + * @throws IOException + */ + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, + long openVersion) throws IOException { + // server may return any number of blocks, (0 to any) + // only the blocks allocated in this open session (block createVersion + // equals to open session version) + for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { + if (subKeyInfo.getCreateVersion() == openVersion) { + addKeyLocationInfo(subKeyInfo); + } + } + } + + private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) + throws IOException { + Preconditions.checkNotNull(subKeyInfo.getPipeline()); + UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); + BlockOutputStreamEntry.Builder builder = + new BlockOutputStreamEntry.Builder() + .setBlockID(subKeyInfo.getBlockID()) + .setKey(keyArgs.getKeyName()) + .setXceiverClientManager(xceiverClientManager) + .setPipeline(subKeyInfo.getPipeline()) + .setRequestId(requestID) + .setChunkSize(chunkSize) + .setLength(subKeyInfo.getLength()) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setbufferPool(bufferPool) + .setChecksumType(checksumType) + .setBytesPerChecksum(bytesPerChecksum) + .setToken(subKeyInfo.getToken()); + streamEntries.add(builder.build()); + } + + public List getLocationInfoList() { + List locationInfoList = new ArrayList<>(); + for (BlockOutputStreamEntry streamEntry : streamEntries) { + long length = streamEntry.getCurrentPosition(); + + // Commit only those blocks to OzoneManager which are not empty + if (length != 0) { + OmKeyLocationInfo info = + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) + .setLength(streamEntry.getCurrentPosition()).setOffset(0) + .setToken(streamEntry.getToken()) + .setPipeline(streamEntry.getPipeline()).build(); + locationInfoList.add(info); + } + LOG.debug( + "block written " + streamEntry.getBlockID() + ", length " + length + + " bcsID " + streamEntry.getBlockID() + .getBlockCommitSequenceId()); + } + return locationInfoList; + } + + /** + * Discards the subsequent pre allocated blocks and removes the streamEntries + * from the streamEntries list for the container which is closed. + * @param containerID id of the closed container + * @param pipelineId id of the associated pipeline + */ + void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) { + // currentStreamIndex < streamEntries.size() signifies that, there are still + // pre allocated blocks available. + + // This will be called only to discard the next subsequent unused blocks + // in the streamEntryList. + if (currentStreamIndex + 1 < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(currentStreamIndex + 1); + while (streamEntryIterator.hasNext()) { + BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); + Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); + if ((pipelineId != null && streamEntry.getPipeline().getId() + .equals(pipelineId)) || (containerID != -1 + && streamEntry.getBlockID().getContainerID() == containerID)) { + streamEntryIterator.remove(); + } + } + } + } + + List getStreamEntries() { + return streamEntries; + } + + XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + String getKeyName() { + return keyArgs.getKeyName(); + } + + long getKeyLength() { + return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()).sum(); + } + /** + * Contact OM to get a new block. Set the new block with the index (e.g. + * first block has index = 0, second has index = 1 etc.) + * + * The returned block is made to new BlockOutputStreamEntry to write. + * + * @throws IOException + */ + private void allocateNewBlock() throws IOException { + OmKeyLocationInfo subKeyInfo = + omClient.allocateBlock(keyArgs, openID, excludeList); + addKeyLocationInfo(subKeyInfo); + } + + + void commitKey(long offset) throws IOException { + if (keyArgs != null) { + // in test, this could be null + long length = getKeyLength(); + Preconditions.checkArgument(offset == length); + keyArgs.setDataSize(length); + keyArgs.setLocationInfoList(getLocationInfoList()); + // When the key is multipart upload part file upload, we should not + // commit the key, as this is not an actual key, this is a just a + // partial key of a large file. + if (keyArgs.getIsMultipartKey()) { + commitUploadPartInfo = + omClient.commitMultipartUploadPart(keyArgs, openID); + } else { + omClient.commitKey(keyArgs, openID); + } + } else { + LOG.warn("Closing KeyOutputStream, but key args is null"); + } + } + + public BlockOutputStreamEntry getCurrentStreamEntry() { + if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) { + return null; + } else { + return streamEntries.get(currentStreamIndex); + } + } + + BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { + BlockOutputStreamEntry streamEntry = getCurrentStreamEntry(); + if (streamEntry != null && streamEntry.isClosed()) { + // a stream entry gets closed either by : + // a. If the stream gets full + // b. it has encountered an exception + currentStreamIndex++; + } + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. + int succeededAllocates = 0; + try { + allocateNewBlock(); + succeededAllocates += 1; + } catch (IOException ioe) { + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + " blocks for this write."); + throw ioe; + } + } + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); + return current; + } + + long computeBufferData() { + return bufferPool.computeBufferData(); + } + + void cleanup() { + if (excludeList != null) { + excludeList.clear(); + excludeList = null; + } + if (bufferPool != null) { + bufferPool.clearBufferPool(); + } + + if (streamEntries != null) { + streamEntries.clear(); + } + } + + public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + return commitUploadPartInfo; + } + + public ExcludeList getExcludeList() { + return excludeList; + } + + public long getStreamBufferMaxSize() { + return streamBufferMaxSize; + } + + boolean isEmpty() { + return streamEntries.isEmpty(); + } +} \ No newline at end of file diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index c4c2524fea..00c4d02715 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -23,21 +23,18 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; -import org.apache.hadoop.hdds.scm.storage.BufferPool; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; @@ -47,10 +44,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.util.ArrayList; import java.util.List; import java.util.Collection; -import java.util.ListIterator; import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -77,84 +72,41 @@ enum StreamAction { public static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class); - // array list's get(index) is O(1) - private final ArrayList streamEntries; - private int currentStreamIndex; - private final OzoneManagerProtocol omClient; - private final OmKeyArgs keyArgs; - private final long openID; - private final XceiverClientManager xceiverClientManager; - private final int chunkSize; - private final String requestID; private boolean closed; - private final long streamBufferFlushSize; - private final long streamBufferMaxSize; - private final long watchTimeout; - private final long blockSize; - private final int bytesPerChecksum; - private final ChecksumType checksumType; - private final BufferPool bufferPool; - private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; - private ExcludeList excludeList; private final Map, RetryPolicy> retryPolicyMap; private int retryCount; private long offset; + private final BlockOutputStreamEntryPool blockOutputStreamEntryPool; + /** * A constructor for testing purpose only. */ @VisibleForTesting - @SuppressWarnings("parameternumber") public KeyOutputStream() { - streamEntries = new ArrayList<>(); - omClient = null; - keyArgs = null; - openID = -1; - xceiverClientManager = null; - chunkSize = 0; - requestID = null; closed = false; - streamBufferFlushSize = 0; - streamBufferMaxSize = 0; - bufferPool = new BufferPool(chunkSize, 1); - watchTimeout = 0; - blockSize = 0; - this.checksumType = ChecksumType.valueOf( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - this.bytesPerChecksum = OzoneConfigKeys - .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB - this.retryPolicyMap = OzoneClientUtils.getExceptionList() + this.retryPolicyMap = HddsClientUtils.getExceptionList() .stream() .collect(Collectors.toMap(Function.identity(), e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); retryCount = 0; offset = 0; + blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(); } @VisibleForTesting public List getStreamEntries() { - return streamEntries; - } - @VisibleForTesting - public XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; + return blockOutputStreamEntryPool.getStreamEntries(); } - public List getLocationInfoList() throws IOException { - List locationInfoList = new ArrayList<>(); - for (BlockOutputStreamEntry streamEntry : streamEntries) { - OmKeyLocationInfo info = - new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) - .setLength(streamEntry.getCurrentPosition()).setOffset(0) - .setToken(streamEntry.getToken()) - .setPipeline(streamEntry.getPipeline()) - .build(); - LOG.debug("block written " + streamEntry.getBlockID() + ", length " - + streamEntry.getCurrentPosition() + " bcsID " - + streamEntry.getBlockID().getBlockCommitSequenceId()); - locationInfoList.add(info); - } - return locationInfoList; + @VisibleForTesting + public XceiverClientManager getXceiverClientManager() { + return blockOutputStreamEntryPool.getXceiverClientManager(); + } + + @VisibleForTesting + public List getLocationInfoList() { + return blockOutputStreamEntryPool.getLocationInfoList(); } @VisibleForTesting @@ -171,41 +123,16 @@ public KeyOutputStream(OpenKeySession handler, ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, int maxRetryCount, long retryInterval) { - this.streamEntries = new ArrayList<>(); - this.currentStreamIndex = 0; - this.omClient = omClient; OmKeyInfo info = handler.getKeyInfo(); + blockOutputStreamEntryPool = + new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, + type, bufferFlushSize, bufferMaxSize, size, watchTimeout, + checksumType, bytesPerChecksum, uploadID, partNumber, isMultipart, + info, xceiverClientManager, handler.getId()); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); - this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) - .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) - .setType(type).setFactor(factor).setDataSize(info.getDataSize()) - .setIsMultipartKey(isMultipart).setMultipartUploadID( - uploadID).setMultipartUploadPartNumber(partNumber) - .build(); - this.openID = handler.getId(); - this.xceiverClientManager = xceiverClientManager; - this.chunkSize = chunkSize; - this.requestID = requestId; - this.streamBufferFlushSize = bufferFlushSize; - this.streamBufferMaxSize = bufferMaxSize; - this.blockSize = size; - this.watchTimeout = watchTimeout; - this.bytesPerChecksum = bytesPerChecksum; - this.checksumType = checksumType; - - Preconditions.checkState(chunkSize > 0); - Preconditions.checkState(streamBufferFlushSize > 0); - Preconditions.checkState(streamBufferMaxSize > 0); - Preconditions.checkState(blockSize > 0); - Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); - Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); - Preconditions.checkState(blockSize % streamBufferMaxSize == 0); - this.bufferPool = - new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize); - this.excludeList = new ExcludeList(); - this.retryPolicyMap = OzoneClientUtils.getRetryPolicyByException( + this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( maxRetryCount, retryInterval); this.retryCount = 0; } @@ -225,37 +152,7 @@ public KeyOutputStream(OpenKeySession handler, */ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) throws IOException { - // server may return any number of blocks, (0 to any) - // only the blocks allocated in this open session (block createVersion - // equals to open session version) - for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { - if (subKeyInfo.getCreateVersion() == openVersion) { - addKeyLocationInfo(subKeyInfo); - } - } - } - - private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) - throws IOException { - Preconditions.checkNotNull(subKeyInfo.getPipeline()); - UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); - BlockOutputStreamEntry.Builder builder = - new BlockOutputStreamEntry.Builder() - .setBlockID(subKeyInfo.getBlockID()) - .setKey(keyArgs.getKeyName()) - .setXceiverClientManager(xceiverClientManager) - .setPipeline(subKeyInfo.getPipeline()) - .setRequestId(requestID) - .setChunkSize(chunkSize) - .setLength(subKeyInfo.getLength()) - .setStreamBufferFlushSize(streamBufferFlushSize) - .setStreamBufferMaxSize(streamBufferMaxSize) - .setWatchTimeout(watchTimeout) - .setbufferPool(bufferPool) - .setChecksumType(checksumType) - .setBytesPerChecksum(bytesPerChecksum) - .setToken(subKeyInfo.getToken()); - streamEntries.add(builder.build()); + blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); } @Override @@ -294,34 +191,12 @@ public void write(byte[] b, int off, int len) handleWrite(b, off, len, false); } - private long computeBufferData() { - return bufferPool.computeBufferData(); - } - private void handleWrite(byte[] b, int off, long len, boolean retry) throws IOException { - int succeededAllocates = 0; while (len > 0) { try { - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " - + "allocated " + succeededAllocates - + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - + BlockOutputStreamEntry current = + blockOutputStreamEntryPool.allocateBlockIfNeeded(); // length(len) will be in int range if the call is happening through // write API of blockOutputStream. Length can be in long range if it // comes via Exception path. @@ -342,7 +217,8 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) // to or less than the max length of the buffer allocated. // The len specified here is the combined sum of the data length of // the buffers - Preconditions.checkState(!retry || len <= streamBufferMaxSize); + Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool + .getStreamBufferMaxSize()); int dataWritten = (int) (current.getWrittenDataLength() - currentPos); writeLen = retry ? (int) len : dataWritten; // In retry path, the data written is already accounted in offset. @@ -350,7 +226,7 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) offset += writeLen; } LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, ioe); + handleException(current, ioe); } if (current.getRemaining() <= 0) { // since the current block is already written close the stream. @@ -365,80 +241,19 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) } } - /** - * Discards the subsequent pre allocated blocks and removes the streamEntries - * from the streamEntries list for the container which is closed. - * @param containerID id of the closed container - * @param pipelineId id of the associated pipeline - * @param streamIndex index of the stream - */ - private void discardPreallocatedBlocks(long containerID, - PipelineID pipelineId, int streamIndex) { - // streamIndex < streamEntries.size() signifies that, there are still - // pre allocated blocks available. - - // This will be called only to discard the next subsequent unused blocks - // in the streamEntryList. - if (streamIndex < streamEntries.size()) { - ListIterator streamEntryIterator = - streamEntries.listIterator(streamIndex); - while (streamEntryIterator.hasNext()) { - BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); - Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); - if (((pipelineId != null && streamEntry.getPipeline().getId() - .equals(pipelineId)) || (containerID != -1 - && streamEntry.getBlockID().getContainerID() == containerID))) { - streamEntryIterator.remove(); - } - } - } - } - - /** - * It might be possible that the blocks pre allocated might never get written - * while the stream gets closed normally. In such cases, it would be a good - * idea to trim down the locationInfoList by removing the unused blocks if any - * so as only the used block info gets updated on OzoneManager during close. - */ - private void removeEmptyBlocks() { - if (currentStreamIndex < streamEntries.size()) { - ListIterator streamEntryIterator = - streamEntries.listIterator(currentStreamIndex); - while (streamEntryIterator.hasNext()) { - if (streamEntryIterator.next().getCurrentPosition() == 0) { - streamEntryIterator.remove(); - } - } - } - } - - private void cleanup() { - if (excludeList != null) { - excludeList.clear(); - excludeList = null; - } - if (bufferPool != null) { - bufferPool.clearBufferPool(); - } - - if (streamEntries != null) { - streamEntries.clear(); - } - } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in - * datanode. + * datanode. * b. Reads the data from the underlying buffer and writes it the next stream. * * @param streamEntry StreamEntry - * @param streamIndex Index of the entry - * @param exception actual exception that occurred + * @param exception actual exception that occurred * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - int streamIndex, IOException exception) throws IOException { - Throwable t = checkForException(exception); + IOException exception) throws IOException { + Throwable t = HddsClientUtils.checkForException(exception); boolean retryFailure = checkForRetryFailure(t); boolean closedContainerException = false; if (!retryFailure) { @@ -448,15 +263,19 @@ private void handleException(BlockOutputStreamEntry streamEntry, long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); - long bufferedDataLen = computeBufferData(); - LOG.warn("Encountered exception {}. The last committed block length is {}, " + long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData(); + LOG.debug( + "Encountered exception {}. The last committed block length is {}, " + "uncommitted data length is {} retry count {}", exception, totalSuccessfulFlushedData, bufferedDataLen, retryCount); - Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); - Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen); + Preconditions.checkArgument( + bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize()); + Preconditions.checkArgument( + offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); + ExcludeList excludeList = blockOutputStreamEntryPool.getExcludeList(); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } @@ -470,45 +289,42 @@ private void handleException(BlockOutputStreamEntry streamEntry, // just clean up the current stream. streamEntry.cleanup(retryFailure); - // discard all sunsequent blocks the containers and pipelines which + // discard all subsequent blocks the containers and pipelines which // are in the exclude list so that, the very next retry should never // write data on the closed container/pipeline if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, - streamIndex + 1); + blockOutputStreamEntryPool + .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), + null); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the - // pipeline, just discard all the preallocated blocks on this pipeline. + // pipeline, just discard all the pre allocated blocks on this pipeline. // Next block allocation will happen with excluding this specific pipeline // This will ensure if 2 way commit happens , it cannot span over multiple // blocks - discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1); + blockOutputStreamEntryPool + .discardPreallocatedBlocks(-1, pipelineId); } if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. - currentStreamIndex += 1; handleRetry(exception, bufferedDataLen); // reset the retryCount after handling the exception retryCount = 0; } - if (totalSuccessfulFlushedData == 0) { - streamEntries.remove(streamIndex); - currentStreamIndex -= 1; - } } private void markStreamClosed() { - cleanup(); + blockOutputStreamEntryPool.cleanup(); closed = true; } private void handleRetry(IOException exception, long len) throws IOException { - RetryPolicy retryPolicy = - retryPolicyMap.get(checkForException(exception).getClass()); + RetryPolicy retryPolicy = retryPolicyMap + .get(HddsClientUtils.checkForException(exception).getClass()); if (retryPolicy == null) { retryPolicy = retryPolicyMap.get(Exception.class); } @@ -544,10 +360,11 @@ private void handleRetry(IOException exception, long len) throws IOException { } } retryCount++; - LOG.trace("Retrying Write request. Already tried " - + retryCount + " time(s); retry policy is " + retryPolicy); + LOG.trace("Retrying Write request. Already tried " + retryCount + + " time(s); retry policy is " + retryPolicy); handleWrite(null, 0, len, true); } + /** * Checks if the provided exception signifies retry failure in ratis client. * In case of retry failure, ratis client throws RaftRetryFailureException @@ -562,40 +379,6 @@ private boolean checkIfContainerIsClosed(Throwable t) { return t instanceof ContainerNotOpenException; } - public Throwable checkForException(IOException ioe) throws IOException { - Throwable t = ioe.getCause(); - while (t != null) { - for (Class cls : OzoneClientUtils - .getExceptionList()) { - if (cls.isInstance(t)) { - return t; - } - } - t = t.getCause(); - } - throw ioe; - } - - private long getKeyLength() { - return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()) - .sum(); - } - - /** - * Contact OM to get a new block. Set the new block with the index (e.g. - * first block has index = 0, second has index = 1 etc.) - * - * The returned block is made to new BlockOutputStreamEntry to write. - * - * @param index the index of the block. - * @throws IOException - */ - private void allocateNewBlock(int index) throws IOException { - OmKeyLocationInfo subKeyInfo = - omClient.allocateBlock(keyArgs, openID, excludeList); - addKeyLocationInfo(subKeyInfo); - } - @Override public void flush() throws IOException { checkNotClosed(); @@ -612,20 +395,19 @@ public void flush() throws IOException { * written to new stream , it will be at max half full. In such cases, we * should just write the data and not close the stream as the block won't be * completely full. + * * @param op Flag which decides whether to call close or flush on the - * outputStream. + * outputStream. * @throws IOException In case, flush or close fails with exception. */ private void handleFlushOrClose(StreamAction op) throws IOException { - if (streamEntries.size() == 0) { + if (blockOutputStreamEntryPool.isEmpty()) { return; } while (true) { try { - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - BlockOutputStreamEntry entry = streamEntries.get(streamIndex); + BlockOutputStreamEntry entry = + blockOutputStreamEntryPool.getCurrentStreamEntry(); if (entry != null) { try { Collection failedServers = @@ -633,7 +415,8 @@ private void handleFlushOrClose(StreamAction op) throws IOException { // failed servers can be null in case there is no data written in // the stream if (failedServers != null && !failedServers.isEmpty()) { - excludeList.addDatanodes(failedServers); + blockOutputStreamEntryPool.getExcludeList() + .addDatanodes(failedServers); } switch (op) { case CLOSE: @@ -642,7 +425,6 @@ private void handleFlushOrClose(StreamAction op) throws IOException { case FULL: if (entry.getRemaining() == 0) { entry.close(); - currentStreamIndex++; } break; case FLUSH: @@ -652,7 +434,7 @@ private void handleFlushOrClose(StreamAction op) throws IOException { throw new IOException("Invalid Operation"); } } catch (IOException ioe) { - handleException(entry, streamIndex, ioe); + handleException(entry, ioe); continue; } } @@ -677,34 +459,16 @@ public void close() throws IOException { closed = true; try { handleFlushOrClose(StreamAction.CLOSE); - if (keyArgs != null) { - // in test, this could be null - removeEmptyBlocks(); - long length = getKeyLength(); - Preconditions.checkArgument(offset == length); - keyArgs.setDataSize(length); - keyArgs.setLocationInfoList(getLocationInfoList()); - // When the key is multipart upload part file upload, we should not - // commit the key, as this is not an actual key, this is a just a - // partial key of a large file. - if (keyArgs.getIsMultipartKey()) { - commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs, - openID); - } else { - omClient.commitKey(keyArgs, openID); - } - } else { - LOG.warn("Closing KeyOutputStream, but key args is null"); - } + blockOutputStreamEntryPool.commitKey(offset); } catch (IOException ioe) { throw ioe; } finally { - cleanup(); + blockOutputStreamEntryPool.cleanup(); } } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - return commitUploadPartInfo; + return blockOutputStreamEntryPool.getCommitUploadPartInfo(); } public FileEncryptionInfo getFileEncryptionInfo() { @@ -713,7 +477,7 @@ public FileEncryptionInfo getFileEncryptionInfo() { @VisibleForTesting public ExcludeList getExcludeList() { - return excludeList; + return blockOutputStreamEntryPool.getExcludeList(); } /** @@ -739,7 +503,6 @@ public static class Builder { private int maxRetryCount; private long retryInterval; - public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; return this; @@ -760,8 +523,7 @@ public Builder setXceiverClientManager(XceiverClientManager manager) { return this; } - public Builder setOmClient( - OzoneManagerProtocol client) { + public Builder setOmClient(OzoneManagerProtocol client) { this.omClient = client; return this; } @@ -806,12 +568,12 @@ public Builder setWatchTimeout(long timeout) { return this; } - public Builder setChecksumType(ChecksumType cType){ + public Builder setChecksumType(ChecksumType cType) { this.checksumType = cType; return this; } - public Builder setBytesPerChecksum(int bytes){ + public Builder setBytesPerChecksum(int bytes) { this.bytesPerChecksum = bytes; return this; } @@ -831,9 +593,9 @@ public Builder setRetryInterval(long retryIntervalInMS) { return this; } - public KeyOutputStream build() throws IOException { - return new KeyOutputStream(openHandler, xceiverManager, - omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, + public KeyOutputStream build() { + return new KeyOutputStream(openHandler, xceiverManager, omClient, + chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, maxRetryCount, retryInterval); @@ -848,8 +610,8 @@ public KeyOutputStream build() throws IOException { private void checkNotClosed() throws IOException { if (closed) { throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs - .getKeyName()); + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + + blockOutputStreamEntryPool.getKeyName()); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 89a2af966a..dfccb98ea0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -24,8 +24,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.container.common.helpers - .ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -75,27 +75,23 @@ public class TestBlockOutputStreamWithFailures { * * @throws IOException */ - @Before - public void init() throws Exception { + @Before public void init() throws Exception { chunkSize = 100; flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; - conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(7) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7) + .setBlockSize(blockSize).setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) - .setStreamBufferSizeUnit(StorageUnit.BYTES) - .build(); + .setStreamBufferSizeUnit(StorageUnit.BYTES).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); @@ -114,25 +110,24 @@ private String getKeyName() { /** * Shutdown MiniDFSCluster. */ - @After - public void shutdown() { + @After public void shutdown() { if (cluster != null) { cluster.shutdown(); } } - @Test - public void testWatchForCommitWithCloseContainerException() throws Exception { + @Test public void testWatchForCommitWithCloseContainerException() + throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -155,15 +150,14 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -199,8 +193,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, @@ -233,9 +226,8 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled @@ -247,8 +239,7 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, @@ -259,25 +250,23 @@ public void testWatchForCommitWithCloseContainerException() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testWatchForCommitDatanodeFailure() throws Exception { + @Test public void testWatchForCommitDatanodeFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -299,14 +288,13 @@ public void testWatchForCommitDatanodeFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -344,8 +332,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -376,8 +363,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert - .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty @@ -396,25 +382,23 @@ public void testWatchForCommitDatanodeFailure() throws Exception { // 4 flushes at flushSize boundaries + 2 flush for partial chunks Assert.assertEquals(putBlockCount + 6, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 16, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 16, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void test2DatanodesFailure() throws Exception { + @Test public void test2DatanodesFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -436,14 +420,13 @@ public void test2DatanodesFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -479,8 +462,7 @@ public void test2DatanodesFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -512,7 +494,7 @@ public void test2DatanodesFailure() throws Exception { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -522,8 +504,7 @@ public void test2DatanodesFailure() throws Exception { key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -533,30 +514,27 @@ public void test2DatanodesFailure() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); validateData(keyName, data1); } - @Test - public void testFailureWithPrimeSizedData() throws Exception { + @Test public void testFailureWithPrimeSizedData() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -577,24 +555,21 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 1, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(0, - blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); @@ -613,8 +588,7 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 3, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -641,7 +615,7 @@ public void testFailureWithPrimeSizedData() throws Exception { key.flush(); Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -653,8 +627,7 @@ public void testFailureWithPrimeSizedData() throws Exception { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -664,26 +637,24 @@ public void testFailureWithPrimeSizedData() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, - metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); + Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); + Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testExceptionDuringClose() throws Exception { + @Test public void testExceptionDuringClose() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -704,24 +675,21 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 1, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(0, - blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); @@ -740,8 +708,7 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 3, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -767,15 +734,14 @@ public void testExceptionDuringClose() throws Exception { // now close the stream, It will hit exception key.close(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -785,26 +751,24 @@ public void testExceptionDuringClose() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testWatchForCommitWithSingleNodeRatis() throws Exception { + @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -828,15 +792,14 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -872,8 +835,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, @@ -907,7 +869,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { // and one flush for partial chunk key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -919,10 +881,9 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -931,25 +892,23 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testDatanodeFailureWithSingleNodeRatis() throws Exception { + @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -972,14 +931,13 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -1015,8 +973,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -1044,7 +1001,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled @@ -1052,8 +1009,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -1073,27 +1029,25 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { // flush failed + 3 more flushes for the next block Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test - public void testDatanodeFailureWithPreAllocation() throws Exception { + @Test public void testDatanodeFailureWithPreAllocation() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + long putBlockCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -1117,14 +1071,13 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); + OutputStream stream = + keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -1160,8 +1113,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -1188,7 +1140,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled @@ -1197,13 +1149,12 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1219,8 +1170,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { // flush failed + 3 more flushes for the next block Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 8be1cccb50..e551ab1ae2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -291,7 +291,7 @@ public void testMultiBlockWrites3() throws Exception { (KeyOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); - // write data 3 blocks and one more chunk + // write data 4 blocks and one more chunk byte[] writtenData = ContainerTestHelper.getFixedLengthString(keyString, keyLen) .getBytes(UTF_8); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 5cb6dbc047..5f6d494a24 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; @@ -50,7 +51,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -85,7 +85,7 @@ public void init() throws Exception { blockSize = 2 * maxFlushSize; conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setQuietMode(false); @@ -150,7 +150,7 @@ public void testGroupMismatchExceptionHandling() throws Exception { .getPipeline(container.getPipelineID()); ContainerTestHelper.waitForPipelineClose(key, cluster, true); key.flush(); - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() .contains(pipeline.getId())); @@ -201,7 +201,7 @@ public void testMaxRetriesByOzoneClient() throws Exception { key.write(data1); Assert.fail("Expected exception not thrown"); } catch (IOException ioe) { - Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); Assert.assertTrue(ioe.getMessage().contains( "Retry request failed. retries get failed due to exceeded maximum " diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java new file mode 100644 index 0000000000..90292eae09 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -0,0 +1,501 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.*; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.RaftRetryFailureException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * This class verifies the watchForCommit Handling by xceiverClient. + */ +public class TestWatchForCommit { + + private MiniOzoneCluster cluster; + private OzoneClient client; + private ObjectStore objectStore; + private String volumeName; + private String bucketName; + private String keyString; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static String containerOwner = "OZONE"; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + private void startCluster(OzoneConfiguration conf) throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + + conf.setQuietMode(false); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "watchforcommithandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + storageContainerLocationClient = cluster + .getStorageContainerLocationClient(); + } + + + /** + * Shutdown MiniDFSCluster. + */ + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + @Test + public void testWatchForCommitWithKeyWrite() throws Exception { + // in this case, watch request should fail with RaftRetryFailureException + // and will be captured in keyOutputStream and the failover will happen + // to a different block + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5); + startCluster(conf); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // acked by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This will call + // watchForCommit again. Since the commit will happen 2 way, the + // commitInfoMap will get updated for servers which are alive + + // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here + // once exception is hit + key.write(data1); + + // As a part of handling the exception, 4 failed writeChunks will be + // rewritten plus one partial chunk plus two putBlocks for flushSize + // and one flush for partial chunk + key.flush(); + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + .getIoException()) instanceof RaftRetryFailureException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + // now close the stream, It will update the ack length after watchForCommit + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + key.close(); + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + validateData(keyName, data1); + shutdown(); + } + + @Test + public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + xceiverClient.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + try { + // just watch for a lo index which in not updated in the commitInfo Map + xceiverClient.watchForCommit(index + 1, 3000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue( + HddsClientUtils.checkForException(e) instanceof TimeoutException); + } + // After releasing the xceiverClient, this connection should be closed + // and any container operations should fail + clientManager.releaseClient(xceiverClient, false); + shutdown(); + } + + @Test + public void testWatchForCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + 100, TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + xceiverClient.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This wi + try { + // just watch for a lo index which in not updated in the commitInfo Map + xceiverClient.watchForCommit(index + 1, 20000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof RaftRetryFailureException); + } + clientManager.releaseClient(xceiverClient, false); + shutdown(); + } + + @Test + public void test2WayCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + xceiverClient.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = xceiverClient.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + xceiverClient.watchForCommit(reply.getLogIndex(), 20000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(xceiverClient, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } + + @Test + public void test2WayCommitForTimeoutException() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + xceiverClient.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = xceiverClient.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + xceiverClient.watchForCommit(reply.getLogIndex(), 3000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(xceiverClient, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } + + @Test + public void testWatchForCommitForGroupMismatchException() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); + + // mark the node stale early so that pipleline gets destroyed quickly + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; + long containerId = container1.getContainerInfo().getContainerID(); + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest(containerId, + xceiverClient.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + List pipelineList = new ArrayList<>(); + pipelineList.add(pipeline); + ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); + try { + // just watch for a lo index which in not updated in the commitInfo Map + xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000); + Assert.fail("Expected exception not thrown"); + } catch(Exception e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof GroupMismatchException); + } + clientManager.releaseClient(xceiverClient, false); + shutdown(); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 463e42889a..4da190762b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -68,7 +69,6 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; @@ -723,11 +723,11 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, MiniOzoneCluster cluster) throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); - List locationInfoList = - keyOutputStream.getLocationInfoList(); + List streamEntryList = + keyOutputStream.getStreamEntries(); List containerIdList = new ArrayList<>(); - for (OmKeyLocationInfo info : locationInfoList) { - long id = info.getContainerID(); + for (BlockOutputStreamEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); if (!containerIdList.contains(id)) { containerIdList.add(id); } @@ -741,11 +741,14 @@ public static void waitForPipelineClose(OzoneOutputStream outputStream, throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); - List locationInfoList = - keyOutputStream.getLocationInfoList(); + List streamEntryList = + keyOutputStream.getStreamEntries(); List containerIdList = new ArrayList<>(); - for (OmKeyLocationInfo info : locationInfoList) { - containerIdList.add(info.getContainerID()); + for (BlockOutputStreamEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); + if (!containerIdList.contains(id)) { + containerIdList.add(id); + } } Assert.assertTrue(!containerIdList.isEmpty()); waitForPipelineClose(cluster, waitForContainerCreation, @@ -784,6 +787,12 @@ public static void waitForPipelineClose(MiniOzoneCluster cluster, } } } + waitForPipelineClose(pipelineList, cluster); + } + + public static void waitForPipelineClose(List pipelineList, + MiniOzoneCluster cluster) + throws TimeoutException, InterruptedException, IOException { for (Pipeline pipeline1 : pipelineList) { // issue pipeline destroy command cluster.getStorageContainerManager().getPipelineManager() diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 1811387e35..f56ce80bf8 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -29,7 +29,7 @@ 3.2.0 0.5.0-SNAPSHOT 0.5.0-SNAPSHOT - 0.3.0 + 0.4.0-fe2b15d-SNAPSHOT 1.60 Crater Lake ${ozone.version}