diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index efd82bce7b..899bba8aad 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -29,7 +29,6 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; -import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf @@ -70,8 +69,7 @@ * The underlying RPC mechanism can be chosen via the constructor. */ public final class XceiverClientRatis extends XceiverClientSpi { - public static final Logger LOG = - LoggerFactory.getLogger(XceiverClientRatis.class); + static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, @@ -250,17 +248,13 @@ public XceiverClientReply watchForCommit(long index, long timeout) return clientReply; } LOG.debug("commit index : {} watch timeout : {}", index, timeout); + CompletableFuture replyFuture = getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); RaftClientReply reply; try { - CompletableFuture replyFuture = getClient() - .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); replyFuture.get(timeout, TimeUnit.MILLISECONDS); - } catch (Exception e) { - Throwable t = HddsClientUtils.checkForException(e); - LOG.warn("3 way commit failed ", e); - if (t instanceof GroupMismatchException) { - throw e; - } + } catch (TimeoutException toe) { + LOG.warn("3 way commit failed ", toe); reply = getClient() .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java index 97b8f95b87..be9bc9320f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java @@ -28,11 +28,8 @@ import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -43,10 +40,6 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.ratis.protocol.AlreadyClosedException; -import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.NotReplicatedException; -import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,12 +50,8 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; + /** * Utility methods for Ozone and Container Clients. @@ -83,18 +72,6 @@ public final class HddsClientUtils { private HddsClientUtils() { } - private static final List> EXCEPTION_LIST = - new ArrayList>() {{ - add(TimeoutException.class); - add(ContainerNotOpenException.class); - add(RaftRetryFailureException.class); - add(AlreadyClosedException.class); - add(GroupMismatchException.class); - // Not Replicated Exception will be thrown if watch For commit - // does not succeed - add(NotReplicatedException.class); - }}; - /** * Date format that used in ozone. Here the format is thread safe to use. */ @@ -313,49 +290,4 @@ public static SCMSecurityProtocol getScmSecurityClient( Client.getRpcTimeout(conf))); return scmSecurityClient; } - - public static Throwable checkForException(Exception e) throws IOException { - Throwable t = e; - while (t != null) { - for (Class cls : getExceptionList()) { - if (cls.isInstance(t)) { - return t; - } - } - t = t.getCause(); - } - - throw e instanceof IOException ? (IOException)e : new IOException(e); - } - - public static RetryPolicy createRetryPolicy(int maxRetryCount, - long retryInterval) { - // retry with fixed sleep between retries - return RetryPolicies.retryUpToMaximumCountWithFixedSleep( - maxRetryCount, retryInterval, TimeUnit.MILLISECONDS); - } - - public static Map, - RetryPolicy> getRetryPolicyByException(int maxRetryCount, - long retryInterval) { - Map, RetryPolicy> policyMap = new HashMap<>(); - for (Class ex : EXCEPTION_LIST) { - if (ex == TimeoutException.class - || ex == RaftRetryFailureException.class) { - // retry without sleep - policyMap.put(ex, createRetryPolicy(maxRetryCount, 0)); - } else { - // retry with fixed sleep between retries - policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval)); - } - } - // Default retry policy - policyMap - .put(Exception.class, createRetryPolicy(maxRetryCount, retryInterval)); - return policyMap; - } - - public static List> getExceptionList() { - return EXCEPTION_LIST; - } } \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 5ca32630c8..139f49404c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream { public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStream.class); - private volatile BlockID blockID; + private BlockID blockID; private final String key; private final String traceID; private final BlockData.Builder containerBlockData; @@ -574,7 +574,7 @@ public void cleanup(boolean invalidateClient) { * @throws IOException if stream is closed */ private void checkOpen() throws IOException { - if (isClosed()) { + if (xceiverClient == null) { throw new IOException("BlockOutputStream has been closed."); } else if (getIoException() != null) { adjustBuffersOnException(); @@ -582,10 +582,6 @@ private void checkOpen() throws IOException { } } - public boolean isClosed() { - return xceiverClient == null; - } - /** * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. @@ -639,9 +635,4 @@ private void writeChunkToContainer(ByteBuffer chunk) throws IOException { + " length " + effectiveChunkSize); containerBlockData.addChunks(chunkInfo); } - - @VisibleForTesting - public void setXceiverClient(XceiverClientSpi xceiverClient) { - this.xceiverClient = xceiverClient; - } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index d4606b514c..aeac941af9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -188,6 +188,7 @@ void releaseBuffersOnException() { */ public XceiverClientReply watchForCommit(long commitIndex) throws IOException { + Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); long index; try { XceiverClientReply reply = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 7dfbc464f7..ccfd7ca1dd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -121,12 +121,12 @@ public final class ScmConfigKeys { TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY = "dfs.ratis.client.request.max.retries"; - public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180; + public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20; public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY = "dfs.ratis.client.request.retry.interval"; public static final TimeDuration DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT = - TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS); + TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY = "dfs.ratis.server.retry-cache.timeout.duration"; public static final TimeDuration diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index ddb97ddf23..6aafb58a3b 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -237,13 +237,13 @@ dfs.ratis.client.request.max.retries - 180 + 20 OZONE, RATIS, MANAGEMENT Number of retries for ratis client request. dfs.ratis.client.request.retry.interval - 1000ms + 500ms OZONE, RATIS, MANAGEMENT Interval between successive retries for a ratis client request. diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index c063cbf5db..f77fe9db9f 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -47,7 +47,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> 0.5.0-SNAPSHOT - 0.4.0-fe2b15d-SNAPSHOT + 0.3.0 1.60 diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 38c0ba7684..83632b5c67 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -18,11 +18,15 @@ package org.apache.hadoop.ozone.client; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.OzoneConsts; @@ -32,11 +36,23 @@ import org.apache.hadoop.ozone.client.rest.response.KeyLocation; import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; import org.apache.hadoop.ozone.client.rest.response.VolumeOwner; +import org.apache.ratis.protocol.AlreadyClosedException; +import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.RaftRetryFailureException; /** A utility class for OzoneClient. */ public final class OzoneClientUtils { private OzoneClientUtils() {} + + private static final List> EXCEPTION_LIST = + new ArrayList>() {{ + add(TimeoutException.class); + add(ContainerNotOpenException.class); + add(RaftRetryFailureException.class); + add(AlreadyClosedException.class); + add(GroupMismatchException.class); + }}; /** * Returns a BucketInfo object constructed using fields of the input * OzoneBucket object. @@ -125,4 +141,26 @@ public static RetryPolicy createRetryPolicy(int maxRetryCount, maxRetryCount, retryInterval, TimeUnit.MILLISECONDS); } + public static List> getExceptionList() { + return EXCEPTION_LIST; + } + + public static Map, RetryPolicy> + getRetryPolicyByException(int maxRetryCount, long retryInterval) { + Map, RetryPolicy> policyMap = new HashMap<>(); + for (Class ex : EXCEPTION_LIST) { + if (ex == TimeoutException.class || + ex == RaftRetryFailureException.class) { + // retry without sleep + policyMap.put(ex, createRetryPolicy(maxRetryCount, 0)); + } else { + // retry with fixed sleep between retries + policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval)); + } + } + // Default retry policy + policyMap.put(Exception.class, createRetryPolicy( + maxRetryCount, retryInterval)); + return policyMap; + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index e11eab90e4..fb700da001 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -149,13 +149,6 @@ public void close() throws IOException { } } - boolean isClosed() { - if (outputStream != null) { - return ((BlockOutputStream) outputStream).isClosed(); - } - return false; - } - long getTotalAckDataLength() { if (outputStream != null) { BlockOutputStream out = (BlockOutputStream) this.outputStream; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java deleted file mode 100644 index 7a8af65b85..0000000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ /dev/null @@ -1,344 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.client.io; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.storage.BufferPool; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.om.helpers.*; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.ListIterator; - -/** - * This class manages the stream entries list and handles block allocation - * from OzoneManager. - */ -public class BlockOutputStreamEntryPool { - - public static final Logger LOG = - LoggerFactory.getLogger(BlockOutputStreamEntryPool.class); - - private final List streamEntries; - private int currentStreamIndex; - private final OzoneManagerProtocol omClient; - private final OmKeyArgs keyArgs; - private final XceiverClientManager xceiverClientManager; - private final int chunkSize; - private final String requestID; - private final long streamBufferFlushSize; - private final long streamBufferMaxSize; - private final long watchTimeout; - private final long blockSize; - private final int bytesPerChecksum; - private final ContainerProtos.ChecksumType checksumType; - private final BufferPool bufferPool; - private OmMultipartCommitUploadPartInfo commitUploadPartInfo; - private final long openID; - private ExcludeList excludeList; - - @SuppressWarnings("parameternumber") - public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, - int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, - HddsProtos.ReplicationType type, long bufferFlushSize, long bufferMaxSize, - long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, - int bytesPerChecksum, String uploadID, int partNumber, - boolean isMultipart, OmKeyInfo info, - XceiverClientManager xceiverClientManager, long openID) { - streamEntries = new ArrayList<>(); - currentStreamIndex = 0; - this.omClient = omClient; - this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) - .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) - .setType(type).setFactor(factor).setDataSize(info.getDataSize()) - .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) - .setMultipartUploadPartNumber(partNumber).build(); - this.xceiverClientManager = xceiverClientManager; - this.chunkSize = chunkSize; - this.requestID = requestId; - this.streamBufferFlushSize = bufferFlushSize; - this.streamBufferMaxSize = bufferMaxSize; - this.blockSize = size; - this.watchTimeout = watchTimeout; - this.bytesPerChecksum = bytesPerChecksum; - this.checksumType = checksumType; - this.openID = openID; - this.excludeList = new ExcludeList(); - - Preconditions.checkState(chunkSize > 0); - Preconditions.checkState(streamBufferFlushSize > 0); - Preconditions.checkState(streamBufferMaxSize > 0); - Preconditions.checkState(blockSize > 0); - Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); - Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); - Preconditions.checkState(blockSize % streamBufferMaxSize == 0); - this.bufferPool = - new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize); - } - - public BlockOutputStreamEntryPool() { - streamEntries = new ArrayList<>(); - omClient = null; - keyArgs = null; - xceiverClientManager = null; - chunkSize = 0; - requestID = null; - streamBufferFlushSize = 0; - streamBufferMaxSize = 0; - bufferPool = new BufferPool(chunkSize, 1); - watchTimeout = 0; - blockSize = 0; - this.checksumType = ContainerProtos.ChecksumType.valueOf( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - this.bytesPerChecksum = OzoneConfigKeys - .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB - currentStreamIndex = 0; - openID = -1; - } - - /** - * When a key is opened, it is possible that there are some blocks already - * allocated to it for this open session. In this case, to make use of these - * blocks, we need to add these blocks to stream entries. But, a key's version - * also includes blocks from previous versions, we need to avoid adding these - * old blocks to stream entries, because these old blocks should not be picked - * for write. To do this, the following method checks that, only those - * blocks created in this particular open version are added to stream entries. - * - * @param version the set of blocks that are pre-allocated. - * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException - */ - public void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { - // server may return any number of blocks, (0 to any) - // only the blocks allocated in this open session (block createVersion - // equals to open session version) - for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { - if (subKeyInfo.getCreateVersion() == openVersion) { - addKeyLocationInfo(subKeyInfo); - } - } - } - - private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) - throws IOException { - Preconditions.checkNotNull(subKeyInfo.getPipeline()); - UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); - BlockOutputStreamEntry.Builder builder = - new BlockOutputStreamEntry.Builder() - .setBlockID(subKeyInfo.getBlockID()) - .setKey(keyArgs.getKeyName()) - .setXceiverClientManager(xceiverClientManager) - .setPipeline(subKeyInfo.getPipeline()) - .setRequestId(requestID) - .setChunkSize(chunkSize) - .setLength(subKeyInfo.getLength()) - .setStreamBufferFlushSize(streamBufferFlushSize) - .setStreamBufferMaxSize(streamBufferMaxSize) - .setWatchTimeout(watchTimeout) - .setbufferPool(bufferPool) - .setChecksumType(checksumType) - .setBytesPerChecksum(bytesPerChecksum) - .setToken(subKeyInfo.getToken()); - streamEntries.add(builder.build()); - } - - public List getLocationInfoList() { - List locationInfoList = new ArrayList<>(); - for (BlockOutputStreamEntry streamEntry : streamEntries) { - long length = streamEntry.getCurrentPosition(); - - // Commit only those blocks to OzoneManager which are not empty - if (length != 0) { - OmKeyLocationInfo info = - new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) - .setLength(streamEntry.getCurrentPosition()).setOffset(0) - .setToken(streamEntry.getToken()) - .setPipeline(streamEntry.getPipeline()).build(); - locationInfoList.add(info); - } - LOG.debug( - "block written " + streamEntry.getBlockID() + ", length " + length - + " bcsID " + streamEntry.getBlockID() - .getBlockCommitSequenceId()); - } - return locationInfoList; - } - - /** - * Discards the subsequent pre allocated blocks and removes the streamEntries - * from the streamEntries list for the container which is closed. - * @param containerID id of the closed container - * @param pipelineId id of the associated pipeline - */ - void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) { - // currentStreamIndex < streamEntries.size() signifies that, there are still - // pre allocated blocks available. - - // This will be called only to discard the next subsequent unused blocks - // in the streamEntryList. - if (currentStreamIndex + 1 < streamEntries.size()) { - ListIterator streamEntryIterator = - streamEntries.listIterator(currentStreamIndex + 1); - while (streamEntryIterator.hasNext()) { - BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); - Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); - if ((pipelineId != null && streamEntry.getPipeline().getId() - .equals(pipelineId)) || (containerID != -1 - && streamEntry.getBlockID().getContainerID() == containerID)) { - streamEntryIterator.remove(); - } - } - } - } - - List getStreamEntries() { - return streamEntries; - } - - XceiverClientManager getXceiverClientManager() { - return xceiverClientManager; - } - - String getKeyName() { - return keyArgs.getKeyName(); - } - - long getKeyLength() { - return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()).sum(); - } - /** - * Contact OM to get a new block. Set the new block with the index (e.g. - * first block has index = 0, second has index = 1 etc.) - * - * The returned block is made to new BlockOutputStreamEntry to write. - * - * @throws IOException - */ - private void allocateNewBlock() throws IOException { - OmKeyLocationInfo subKeyInfo = - omClient.allocateBlock(keyArgs, openID, excludeList); - addKeyLocationInfo(subKeyInfo); - } - - - void commitKey(long offset) throws IOException { - if (keyArgs != null) { - // in test, this could be null - long length = getKeyLength(); - Preconditions.checkArgument(offset == length); - keyArgs.setDataSize(length); - keyArgs.setLocationInfoList(getLocationInfoList()); - // When the key is multipart upload part file upload, we should not - // commit the key, as this is not an actual key, this is a just a - // partial key of a large file. - if (keyArgs.getIsMultipartKey()) { - commitUploadPartInfo = - omClient.commitMultipartUploadPart(keyArgs, openID); - } else { - omClient.commitKey(keyArgs, openID); - } - } else { - LOG.warn("Closing KeyOutputStream, but key args is null"); - } - } - - public BlockOutputStreamEntry getCurrentStreamEntry() { - if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) { - return null; - } else { - return streamEntries.get(currentStreamIndex); - } - } - - BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { - BlockOutputStreamEntry streamEntry = getCurrentStreamEntry(); - if (streamEntry != null && streamEntry.isClosed()) { - // a stream entry gets closed either by : - // a. If the stream gets full - // b. it has encountered an exception - currentStreamIndex++; - } - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - int succeededAllocates = 0; - try { - allocateNewBlock(); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " - + "allocated " + succeededAllocates + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - return current; - } - - long computeBufferData() { - return bufferPool.computeBufferData(); - } - - void cleanup() { - if (excludeList != null) { - excludeList.clear(); - excludeList = null; - } - if (bufferPool != null) { - bufferPool.clearBufferPool(); - } - - if (streamEntries != null) { - streamEntries.clear(); - } - } - - public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - return commitUploadPartInfo; - } - - public ExcludeList getExcludeList() { - return excludeList; - } - - public long getStreamBufferMaxSize() { - return streamBufferMaxSize; - } - - boolean isEmpty() { - return streamEntries.isEmpty(); - } -} \ No newline at end of file diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 00c4d02715..c4c2524fea 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -23,18 +23,21 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; @@ -44,8 +47,10 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.List; import java.util.Collection; +import java.util.ListIterator; import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -72,41 +77,84 @@ enum StreamAction { public static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class); + // array list's get(index) is O(1) + private final ArrayList streamEntries; + private int currentStreamIndex; + private final OzoneManagerProtocol omClient; + private final OmKeyArgs keyArgs; + private final long openID; + private final XceiverClientManager xceiverClientManager; + private final int chunkSize; + private final String requestID; private boolean closed; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; + private final int bytesPerChecksum; + private final ChecksumType checksumType; + private final BufferPool bufferPool; + private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; + private ExcludeList excludeList; private final Map, RetryPolicy> retryPolicyMap; private int retryCount; private long offset; - private final BlockOutputStreamEntryPool blockOutputStreamEntryPool; - /** * A constructor for testing purpose only. */ @VisibleForTesting + @SuppressWarnings("parameternumber") public KeyOutputStream() { + streamEntries = new ArrayList<>(); + omClient = null; + keyArgs = null; + openID = -1; + xceiverClientManager = null; + chunkSize = 0; + requestID = null; closed = false; - this.retryPolicyMap = HddsClientUtils.getExceptionList() + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + bufferPool = new BufferPool(chunkSize, 1); + watchTimeout = 0; + blockSize = 0; + this.checksumType = ChecksumType.valueOf( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + this.bytesPerChecksum = OzoneConfigKeys + .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB + this.retryPolicyMap = OzoneClientUtils.getExceptionList() .stream() .collect(Collectors.toMap(Function.identity(), e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); retryCount = 0; offset = 0; - blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(); } @VisibleForTesting public List getStreamEntries() { - return blockOutputStreamEntryPool.getStreamEntries(); + return streamEntries; } - @VisibleForTesting public XceiverClientManager getXceiverClientManager() { - return blockOutputStreamEntryPool.getXceiverClientManager(); + return xceiverClientManager; } - @VisibleForTesting - public List getLocationInfoList() { - return blockOutputStreamEntryPool.getLocationInfoList(); + public List getLocationInfoList() throws IOException { + List locationInfoList = new ArrayList<>(); + for (BlockOutputStreamEntry streamEntry : streamEntries) { + OmKeyLocationInfo info = + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID()) + .setLength(streamEntry.getCurrentPosition()).setOffset(0) + .setToken(streamEntry.getToken()) + .setPipeline(streamEntry.getPipeline()) + .build(); + LOG.debug("block written " + streamEntry.getBlockID() + ", length " + + streamEntry.getCurrentPosition() + " bcsID " + + streamEntry.getBlockID().getBlockCommitSequenceId()); + locationInfoList.add(info); + } + return locationInfoList; } @VisibleForTesting @@ -123,16 +171,41 @@ public KeyOutputStream(OpenKeySession handler, ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, int maxRetryCount, long retryInterval) { + this.streamEntries = new ArrayList<>(); + this.currentStreamIndex = 0; + this.omClient = omClient; OmKeyInfo info = handler.getKeyInfo(); - blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, - type, bufferFlushSize, bufferMaxSize, size, watchTimeout, - checksumType, bytesPerChecksum, uploadID, partNumber, isMultipart, - info, xceiverClientManager, handler.getId()); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); - this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setType(type).setFactor(factor).setDataSize(info.getDataSize()) + .setIsMultipartKey(isMultipart).setMultipartUploadID( + uploadID).setMultipartUploadPartNumber(partNumber) + .build(); + this.openID = handler.getId(); + this.xceiverClientManager = xceiverClientManager; + this.chunkSize = chunkSize; + this.requestID = requestId; + this.streamBufferFlushSize = bufferFlushSize; + this.streamBufferMaxSize = bufferMaxSize; + this.blockSize = size; + this.watchTimeout = watchTimeout; + this.bytesPerChecksum = bytesPerChecksum; + this.checksumType = checksumType; + + Preconditions.checkState(chunkSize > 0); + Preconditions.checkState(streamBufferFlushSize > 0); + Preconditions.checkState(streamBufferMaxSize > 0); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); + Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); + Preconditions.checkState(blockSize % streamBufferMaxSize == 0); + this.bufferPool = + new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize); + this.excludeList = new ExcludeList(); + this.retryPolicyMap = OzoneClientUtils.getRetryPolicyByException( maxRetryCount, retryInterval); this.retryCount = 0; } @@ -152,7 +225,37 @@ public KeyOutputStream(OpenKeySession handler, */ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) throws IOException { - blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); + // server may return any number of blocks, (0 to any) + // only the blocks allocated in this open session (block createVersion + // equals to open session version) + for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { + if (subKeyInfo.getCreateVersion() == openVersion) { + addKeyLocationInfo(subKeyInfo); + } + } + } + + private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) + throws IOException { + Preconditions.checkNotNull(subKeyInfo.getPipeline()); + UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); + BlockOutputStreamEntry.Builder builder = + new BlockOutputStreamEntry.Builder() + .setBlockID(subKeyInfo.getBlockID()) + .setKey(keyArgs.getKeyName()) + .setXceiverClientManager(xceiverClientManager) + .setPipeline(subKeyInfo.getPipeline()) + .setRequestId(requestID) + .setChunkSize(chunkSize) + .setLength(subKeyInfo.getLength()) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setbufferPool(bufferPool) + .setChecksumType(checksumType) + .setBytesPerChecksum(bytesPerChecksum) + .setToken(subKeyInfo.getToken()); + streamEntries.add(builder.build()); } @Override @@ -191,12 +294,34 @@ public void write(byte[] b, int off, int len) handleWrite(b, off, len, false); } + private long computeBufferData() { + return bufferPool.computeBufferData(); + } + private void handleWrite(byte[] b, int off, long len, boolean retry) throws IOException { + int succeededAllocates = 0; while (len > 0) { try { - BlockOutputStreamEntry current = - blockOutputStreamEntryPool.allocateBlockIfNeeded(); + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. + try { + allocateNewBlock(currentStreamIndex); + succeededAllocates += 1; + } catch (IOException ioe) { + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + + " blocks for this write."); + throw ioe; + } + } + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); + // length(len) will be in int range if the call is happening through // write API of blockOutputStream. Length can be in long range if it // comes via Exception path. @@ -217,8 +342,7 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) // to or less than the max length of the buffer allocated. // The len specified here is the combined sum of the data length of // the buffers - Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool - .getStreamBufferMaxSize()); + Preconditions.checkState(!retry || len <= streamBufferMaxSize); int dataWritten = (int) (current.getWrittenDataLength() - currentPos); writeLen = retry ? (int) len : dataWritten; // In retry path, the data written is already accounted in offset. @@ -226,7 +350,7 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) offset += writeLen; } LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, ioe); + handleException(current, currentStreamIndex, ioe); } if (current.getRemaining() <= 0) { // since the current block is already written close the stream. @@ -241,19 +365,80 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) } } + /** + * Discards the subsequent pre allocated blocks and removes the streamEntries + * from the streamEntries list for the container which is closed. + * @param containerID id of the closed container + * @param pipelineId id of the associated pipeline + * @param streamIndex index of the stream + */ + private void discardPreallocatedBlocks(long containerID, + PipelineID pipelineId, int streamIndex) { + // streamIndex < streamEntries.size() signifies that, there are still + // pre allocated blocks available. + + // This will be called only to discard the next subsequent unused blocks + // in the streamEntryList. + if (streamIndex < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(streamIndex); + while (streamEntryIterator.hasNext()) { + BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); + Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); + if (((pipelineId != null && streamEntry.getPipeline().getId() + .equals(pipelineId)) || (containerID != -1 + && streamEntry.getBlockID().getContainerID() == containerID))) { + streamEntryIterator.remove(); + } + } + } + } + + /** + * It might be possible that the blocks pre allocated might never get written + * while the stream gets closed normally. In such cases, it would be a good + * idea to trim down the locationInfoList by removing the unused blocks if any + * so as only the used block info gets updated on OzoneManager during close. + */ + private void removeEmptyBlocks() { + if (currentStreamIndex < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(currentStreamIndex); + while (streamEntryIterator.hasNext()) { + if (streamEntryIterator.next().getCurrentPosition() == 0) { + streamEntryIterator.remove(); + } + } + } + } + + private void cleanup() { + if (excludeList != null) { + excludeList.clear(); + excludeList = null; + } + if (bufferPool != null) { + bufferPool.clearBufferPool(); + } + + if (streamEntries != null) { + streamEntries.clear(); + } + } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in - * datanode. + * datanode. * b. Reads the data from the underlying buffer and writes it the next stream. * * @param streamEntry StreamEntry - * @param exception actual exception that occurred + * @param streamIndex Index of the entry + * @param exception actual exception that occurred * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - IOException exception) throws IOException { - Throwable t = HddsClientUtils.checkForException(exception); + int streamIndex, IOException exception) throws IOException { + Throwable t = checkForException(exception); boolean retryFailure = checkForRetryFailure(t); boolean closedContainerException = false; if (!retryFailure) { @@ -263,19 +448,15 @@ private void handleException(BlockOutputStreamEntry streamEntry, long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); - long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData(); - LOG.debug( - "Encountered exception {}. The last committed block length is {}, " + long bufferedDataLen = computeBufferData(); + LOG.warn("Encountered exception {}. The last committed block length is {}, " + "uncommitted data length is {} retry count {}", exception, totalSuccessfulFlushedData, bufferedDataLen, retryCount); - Preconditions.checkArgument( - bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize()); - Preconditions.checkArgument( - offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); + Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); + Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); - ExcludeList excludeList = blockOutputStreamEntryPool.getExcludeList(); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } @@ -289,42 +470,45 @@ private void handleException(BlockOutputStreamEntry streamEntry, // just clean up the current stream. streamEntry.cleanup(retryFailure); - // discard all subsequent blocks the containers and pipelines which + // discard all sunsequent blocks the containers and pipelines which // are in the exclude list so that, the very next retry should never // write data on the closed container/pipeline if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - blockOutputStreamEntryPool - .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), - null); + discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, + streamIndex + 1); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the - // pipeline, just discard all the pre allocated blocks on this pipeline. + // pipeline, just discard all the preallocated blocks on this pipeline. // Next block allocation will happen with excluding this specific pipeline // This will ensure if 2 way commit happens , it cannot span over multiple // blocks - blockOutputStreamEntryPool - .discardPreallocatedBlocks(-1, pipelineId); + discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1); } if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. + currentStreamIndex += 1; handleRetry(exception, bufferedDataLen); // reset the retryCount after handling the exception retryCount = 0; } + if (totalSuccessfulFlushedData == 0) { + streamEntries.remove(streamIndex); + currentStreamIndex -= 1; + } } private void markStreamClosed() { - blockOutputStreamEntryPool.cleanup(); + cleanup(); closed = true; } private void handleRetry(IOException exception, long len) throws IOException { - RetryPolicy retryPolicy = retryPolicyMap - .get(HddsClientUtils.checkForException(exception).getClass()); + RetryPolicy retryPolicy = + retryPolicyMap.get(checkForException(exception).getClass()); if (retryPolicy == null) { retryPolicy = retryPolicyMap.get(Exception.class); } @@ -360,11 +544,10 @@ private void handleRetry(IOException exception, long len) throws IOException { } } retryCount++; - LOG.trace("Retrying Write request. Already tried " + retryCount - + " time(s); retry policy is " + retryPolicy); + LOG.trace("Retrying Write request. Already tried " + + retryCount + " time(s); retry policy is " + retryPolicy); handleWrite(null, 0, len, true); } - /** * Checks if the provided exception signifies retry failure in ratis client. * In case of retry failure, ratis client throws RaftRetryFailureException @@ -379,6 +562,40 @@ private boolean checkIfContainerIsClosed(Throwable t) { return t instanceof ContainerNotOpenException; } + public Throwable checkForException(IOException ioe) throws IOException { + Throwable t = ioe.getCause(); + while (t != null) { + for (Class cls : OzoneClientUtils + .getExceptionList()) { + if (cls.isInstance(t)) { + return t; + } + } + t = t.getCause(); + } + throw ioe; + } + + private long getKeyLength() { + return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()) + .sum(); + } + + /** + * Contact OM to get a new block. Set the new block with the index (e.g. + * first block has index = 0, second has index = 1 etc.) + * + * The returned block is made to new BlockOutputStreamEntry to write. + * + * @param index the index of the block. + * @throws IOException + */ + private void allocateNewBlock(int index) throws IOException { + OmKeyLocationInfo subKeyInfo = + omClient.allocateBlock(keyArgs, openID, excludeList); + addKeyLocationInfo(subKeyInfo); + } + @Override public void flush() throws IOException { checkNotClosed(); @@ -395,19 +612,20 @@ public void flush() throws IOException { * written to new stream , it will be at max half full. In such cases, we * should just write the data and not close the stream as the block won't be * completely full. - * * @param op Flag which decides whether to call close or flush on the - * outputStream. + * outputStream. * @throws IOException In case, flush or close fails with exception. */ private void handleFlushOrClose(StreamAction op) throws IOException { - if (blockOutputStreamEntryPool.isEmpty()) { + if (streamEntries.size() == 0) { return; } while (true) { try { - BlockOutputStreamEntry entry = - blockOutputStreamEntryPool.getCurrentStreamEntry(); + int size = streamEntries.size(); + int streamIndex = + currentStreamIndex >= size ? size - 1 : currentStreamIndex; + BlockOutputStreamEntry entry = streamEntries.get(streamIndex); if (entry != null) { try { Collection failedServers = @@ -415,8 +633,7 @@ private void handleFlushOrClose(StreamAction op) throws IOException { // failed servers can be null in case there is no data written in // the stream if (failedServers != null && !failedServers.isEmpty()) { - blockOutputStreamEntryPool.getExcludeList() - .addDatanodes(failedServers); + excludeList.addDatanodes(failedServers); } switch (op) { case CLOSE: @@ -425,6 +642,7 @@ private void handleFlushOrClose(StreamAction op) throws IOException { case FULL: if (entry.getRemaining() == 0) { entry.close(); + currentStreamIndex++; } break; case FLUSH: @@ -434,7 +652,7 @@ private void handleFlushOrClose(StreamAction op) throws IOException { throw new IOException("Invalid Operation"); } } catch (IOException ioe) { - handleException(entry, ioe); + handleException(entry, streamIndex, ioe); continue; } } @@ -459,16 +677,34 @@ public void close() throws IOException { closed = true; try { handleFlushOrClose(StreamAction.CLOSE); - blockOutputStreamEntryPool.commitKey(offset); + if (keyArgs != null) { + // in test, this could be null + removeEmptyBlocks(); + long length = getKeyLength(); + Preconditions.checkArgument(offset == length); + keyArgs.setDataSize(length); + keyArgs.setLocationInfoList(getLocationInfoList()); + // When the key is multipart upload part file upload, we should not + // commit the key, as this is not an actual key, this is a just a + // partial key of a large file. + if (keyArgs.getIsMultipartKey()) { + commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs, + openID); + } else { + omClient.commitKey(keyArgs, openID); + } + } else { + LOG.warn("Closing KeyOutputStream, but key args is null"); + } } catch (IOException ioe) { throw ioe; } finally { - blockOutputStreamEntryPool.cleanup(); + cleanup(); } } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - return blockOutputStreamEntryPool.getCommitUploadPartInfo(); + return commitUploadPartInfo; } public FileEncryptionInfo getFileEncryptionInfo() { @@ -477,7 +713,7 @@ public FileEncryptionInfo getFileEncryptionInfo() { @VisibleForTesting public ExcludeList getExcludeList() { - return blockOutputStreamEntryPool.getExcludeList(); + return excludeList; } /** @@ -503,6 +739,7 @@ public static class Builder { private int maxRetryCount; private long retryInterval; + public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; return this; @@ -523,7 +760,8 @@ public Builder setXceiverClientManager(XceiverClientManager manager) { return this; } - public Builder setOmClient(OzoneManagerProtocol client) { + public Builder setOmClient( + OzoneManagerProtocol client) { this.omClient = client; return this; } @@ -568,12 +806,12 @@ public Builder setWatchTimeout(long timeout) { return this; } - public Builder setChecksumType(ChecksumType cType) { + public Builder setChecksumType(ChecksumType cType){ this.checksumType = cType; return this; } - public Builder setBytesPerChecksum(int bytes) { + public Builder setBytesPerChecksum(int bytes){ this.bytesPerChecksum = bytes; return this; } @@ -593,9 +831,9 @@ public Builder setRetryInterval(long retryIntervalInMS) { return this; } - public KeyOutputStream build() { - return new KeyOutputStream(openHandler, xceiverManager, omClient, - chunkSize, requestID, factor, type, streamBufferFlushSize, + public KeyOutputStream build() throws IOException { + return new KeyOutputStream(openHandler, xceiverManager, + omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, maxRetryCount, retryInterval); @@ -610,8 +848,8 @@ public KeyOutputStream build() { private void checkNotClosed() throws IOException { if (closed) { throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " - + blockOutputStreamEntryPool.getKeyName()); + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs + .getKeyName()); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index dfccb98ea0..89a2af966a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -24,8 +24,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.container.common.helpers + .ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -75,23 +75,27 @@ public class TestBlockOutputStreamWithFailures { * * @throws IOException */ - @Before public void init() throws Exception { + @Before + public void init() throws Exception { chunkSize = 100; flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; - conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s"); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7) - .setBlockSize(blockSize).setChunkSize(chunkSize) + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) - .setStreamBufferSizeUnit(StorageUnit.BYTES).build(); + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); @@ -110,24 +114,25 @@ private String getKeyName() { /** * Shutdown MiniDFSCluster. */ - @After public void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } } - @Test public void testWatchForCommitWithCloseContainerException() - throws Exception { + @Test + public void testWatchForCommitWithCloseContainerException() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -150,14 +155,15 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -193,7 +199,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, @@ -226,8 +233,9 @@ private String getKeyName() { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled @@ -239,7 +247,8 @@ private String getKeyName() { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, @@ -250,23 +259,25 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testWatchForCommitDatanodeFailure() throws Exception { + @Test + public void testWatchForCommitDatanodeFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -288,13 +299,14 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -332,7 +344,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -363,7 +376,8 @@ private String getKeyName() { Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty @@ -382,23 +396,25 @@ private String getKeyName() { // 4 flushes at flushSize boundaries + 2 flush for partial chunks Assert.assertEquals(putBlockCount + 6, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 16, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 16, + metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void test2DatanodesFailure() throws Exception { + @Test + public void test2DatanodesFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -420,13 +436,14 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -462,7 +479,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -494,7 +512,7 @@ private String getKeyName() { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -504,7 +522,8 @@ private String getKeyName() { key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -514,27 +533,30 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } - @Test public void testFailureWithPrimeSizedData() throws Exception { + @Test + public void testFailureWithPrimeSizedData() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -555,21 +577,24 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 1, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, + blockOutputStream.getTotalDataFlushedLength()); Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); @@ -588,7 +613,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 3, + metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -615,7 +641,7 @@ private String getKeyName() { key.flush(); Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -627,7 +653,8 @@ private String getKeyName() { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -637,24 +664,26 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0); + Assert.assertEquals(totalOpCount + 9, + metrics.getTotalOpCount()); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testExceptionDuringClose() throws Exception { + @Test + public void testExceptionDuringClose() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); @@ -675,21 +704,24 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 1, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, + blockOutputStream.getTotalDataFlushedLength()); Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); @@ -708,7 +740,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 1, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 3, + metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -734,14 +767,15 @@ private String getKeyName() { // now close the stream, It will hit exception key.close(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -751,24 +785,26 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 9, + metrics.getTotalOpCount()); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception { + @Test + public void testWatchForCommitWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -792,14 +828,15 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -835,7 +872,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, @@ -869,7 +907,7 @@ private String getKeyName() { // and one flush for partial chunk key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); @@ -881,9 +919,10 @@ private String getKeyName() { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -892,23 +931,25 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception { + @Test + public void testDatanodeFailureWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -931,13 +972,14 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -973,7 +1015,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -1001,7 +1044,7 @@ private String getKeyName() { key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled @@ -1009,7 +1052,8 @@ private String getKeyName() { Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -1029,25 +1073,27 @@ private String getKeyName() { // flush failed + 3 more flushes for the next block Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); - Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testDatanodeFailureWithPreAllocation() throws Exception { + @Test + public void testDatanodeFailureWithPreAllocation() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); - long putBlockCount = - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); long totalOpCount = metrics.getTotalOpCount(); String keyName = getKeyName(); OzoneOutputStream key = @@ -1071,13 +1117,14 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 2, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3); - OutputStream stream = - keyOutputStream.getStreamEntries().get(0).getOutputStream(); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; @@ -1113,7 +1160,8 @@ private String getKeyName() { metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures @@ -1140,7 +1188,7 @@ private String getKeyName() { key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof RaftRetryFailureException); // Make sure the retryCount is reset after the exception is handled @@ -1149,12 +1197,13 @@ private String getKeyName() { // now close the stream, It will update the ack length after watchForCommit key.close(); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1170,7 +1219,8 @@ private String getKeyName() { // flush failed + 3 more flushes for the next block Assert.assertEquals(putBlockCount + 8, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index e551ab1ae2..8be1cccb50 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -291,7 +291,7 @@ public void testMultiBlockWrites3() throws Exception { (KeyOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); - // write data 4 blocks and one more chunk + // write data 3 blocks and one more chunk byte[] writtenData = ContainerTestHelper.getFixedLengthString(keyString, keyLen) .getBytes(UTF_8); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 5f6d494a24..5cb6dbc047 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; @@ -51,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -85,7 +85,7 @@ public void init() throws Exception { blockSize = 2 * maxFlushSize; conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setQuietMode(false); @@ -150,7 +150,7 @@ public void testGroupMismatchExceptionHandling() throws Exception { .getPipeline(container.getPipelineID()); ContainerTestHelper.waitForPipelineClose(key, cluster, true); key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() .contains(pipeline.getId())); @@ -201,7 +201,7 @@ public void testMaxRetriesByOzoneClient() throws Exception { key.write(data1); Assert.fail("Expected exception not thrown"); } catch (IOException ioe) { - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); Assert.assertTrue(ioe.getMessage().contains( "Retry request failed. retries get failed due to exceeded maximum " diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java deleted file mode 100644 index 90292eae09..0000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ /dev/null @@ -1,501 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.client.rpc; - -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.*; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.io.KeyOutputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.RaftRetryFailureException; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; - -/** - * This class verifies the watchForCommit Handling by xceiverClient. - */ -public class TestWatchForCommit { - - private MiniOzoneCluster cluster; - private OzoneClient client; - private ObjectStore objectStore; - private String volumeName; - private String bucketName; - private String keyString; - private int chunkSize; - private int flushSize; - private int maxFlushSize; - private int blockSize; - private StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static String containerOwner = "OZONE"; - - /** - * Create a MiniDFSCluster for testing. - *

- * Ozone is made active by setting OZONE_ENABLED = true - * - * @throws IOException - */ - private void startCluster(OzoneConfiguration conf) throws Exception { - chunkSize = 100; - flushSize = 2 * chunkSize; - maxFlushSize = 2 * flushSize; - blockSize = 2 * maxFlushSize; - - conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration( - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, - 1, TimeUnit.SECONDS); - - conf.setQuietMode(false); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(7) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) - .setStreamBufferFlushSize(flushSize) - .setStreamBufferMaxSize(maxFlushSize) - .setStreamBufferSizeUnit(StorageUnit.BYTES) - .build(); - cluster.waitForClusterToBeReady(); - //the easiest way to create an open container is creating a key - client = OzoneClientFactory.getClient(conf); - objectStore = client.getObjectStore(); - keyString = UUID.randomUUID().toString(); - volumeName = "watchforcommithandlingtest"; - bucketName = volumeName; - objectStore.createVolume(volumeName); - objectStore.getVolume(volumeName).createBucket(bucketName); - storageContainerLocationClient = cluster - .getStorageContainerLocationClient(); - } - - - /** - * Shutdown MiniDFSCluster. - */ - private void shutdown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - private String getKeyName() { - return UUID.randomUUID().toString(); - } - - @Test - public void testWatchForCommitWithKeyWrite() throws Exception { - // in this case, watch request should fail with RaftRetryFailureException - // and will be captured in keyOutputStream and the failover will happen - // to a different block - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5); - startCluster(conf); - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); - long totalOpCount = metrics.getTotalOpCount(); - String keyName = getKeyName(); - OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - int dataLength = maxFlushSize + 50; - // write data more than 1 chunk - byte[] data1 = - ContainerTestHelper.getFixedLengthString(keyString, dataLength) - .getBytes(UTF_8); - key.write(data1); - // since its hitting the full bufferCondition, it will call watchForCommit - // and completes atleast putBlock for first flushSize worth of data - Assert.assertTrue( - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) - <= pendingWriteChunkCount + 2); - Assert.assertTrue( - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) - <= pendingPutBlockCount + 1); - Assert.assertEquals(writeChunkCount + 4, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 2, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); - Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); - - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); - Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - - // we have just written data more than flush Size(2 chunks), at this time - // buffer pool will have 3 buffers allocated worth of chunk size - - Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); - // writtenDataLength as well flushedDataLength will be updated here - Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - - Assert.assertEquals(maxFlushSize, - blockOutputStream.getTotalDataFlushedLength()); - - // since data equals to maxBufferSize is written, this will be a blocking - // call and hence will wait for atleast flushSize worth of data to get - // acked by all servers right here - Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); - - // watchForCommit will clean up atleast one entry from the map where each - // entry corresponds to flushSize worth of data - Assert.assertTrue( - blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); - - // Now do a flush. This will flush the data and update the flush length and - // the map. - key.flush(); - - Assert.assertEquals(pendingWriteChunkCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(writeChunkCount + 5, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 3, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); - - // Since the data in the buffer is already flushed, flush here will have - // no impact on the counters and data structures - - Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); - Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - - Assert.assertEquals(dataLength, - blockOutputStream.getTotalDataFlushedLength()); - // flush will make sure one more entry gets updated in the map - Assert.assertTrue( - blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); - - XceiverClientRatis raftClient = - (XceiverClientRatis) blockOutputStream.getXceiverClient(); - Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); - Pipeline pipeline = raftClient.getPipeline(); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); - // again write data with more than max buffer limit. This will call - // watchForCommit again. Since the commit will happen 2 way, the - // commitInfoMap will get updated for servers which are alive - - // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here - // once exception is hit - key.write(data1); - - // As a part of handling the exception, 4 failed writeChunks will be - // rewritten plus one partial chunk plus two putBlocks for flushSize - // and one flush for partial chunk - key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream - .getIoException()) instanceof RaftRetryFailureException); - // Make sure the retryCount is reset after the exception is handled - Assert.assertTrue(keyOutputStream.getRetryCount() == 0); - // now close the stream, It will update the ack length after watchForCommit - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - key.close(); - Assert - .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); - Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); - Assert.assertEquals(pendingWriteChunkCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(writeChunkCount + 14, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 8, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); - // make sure the bufferPool is empty - Assert - .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - validateData(keyName, data1); - shutdown(); - } - - @Test - public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); - startCluster(conf); - XceiverClientManager clientManager = new XceiverClientManager(conf); - ContainerWithPipeline container1 = storageContainerLocationClient - .allocateContainer(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi xceiverClient = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, xceiverClient.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - xceiverClient.getPipeline()); - Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientReply reply = xceiverClient.sendCommandAsync( - ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - xceiverClient.getPipeline())); - reply.getResponse().get(); - long index = reply.getLogIndex(); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); - try { - // just watch for a lo index which in not updated in the commitInfo Map - xceiverClient.watchForCommit(index + 1, 3000); - Assert.fail("expected exception not thrown"); - } catch (Exception e) { - Assert.assertTrue( - HddsClientUtils.checkForException(e) instanceof TimeoutException); - } - // After releasing the xceiverClient, this connection should be closed - // and any container operations should fail - clientManager.releaseClient(xceiverClient, false); - shutdown(); - } - - @Test - public void testWatchForCommitForRetryfailure() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, - 100, TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); - startCluster(conf); - XceiverClientManager clientManager = new XceiverClientManager(conf); - ContainerWithPipeline container1 = storageContainerLocationClient - .allocateContainer(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi xceiverClient = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, xceiverClient.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - xceiverClient.getPipeline()); - Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientReply reply = xceiverClient.sendCommandAsync( - ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - xceiverClient.getPipeline())); - reply.getResponse().get(); - long index = reply.getLogIndex(); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); - // again write data with more than max buffer limit. This wi - try { - // just watch for a lo index which in not updated in the commitInfo Map - xceiverClient.watchForCommit(index + 1, 20000); - Assert.fail("expected exception not thrown"); - } catch (Exception e) { - Assert.assertTrue(HddsClientUtils - .checkForException(e) instanceof RaftRetryFailureException); - } - clientManager.releaseClient(xceiverClient, false); - shutdown(); - } - - @Test - public void test2WayCommitForRetryfailure() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = storageContainerLocationClient - .allocateContainer(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi xceiverClient = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, xceiverClient.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - xceiverClient.getPipeline()); - Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - XceiverClientReply reply = xceiverClient.sendCommandAsync( - ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - xceiverClient.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = xceiverClient.sendCommandAsync(ContainerTestHelper - .getCloseContainer(pipeline, - container1.getContainerInfo().getContainerID())); - reply.getResponse().get(); - xceiverClient.watchForCommit(reply.getLogIndex(), 20000); - - // commitInfo Map will be reduced to 2 here - Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(xceiverClient, false); - Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); - Assert - .assertTrue(logCapturer.getOutput().contains("Committed by majority")); - logCapturer.stopCapturing(); - shutdown(); - } - - @Test - public void test2WayCommitForTimeoutException() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = storageContainerLocationClient - .allocateContainer(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi xceiverClient = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, xceiverClient.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - xceiverClient.getPipeline()); - Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - XceiverClientReply reply = xceiverClient.sendCommandAsync( - ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - xceiverClient.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = xceiverClient.sendCommandAsync(ContainerTestHelper - .getCloseContainer(pipeline, - container1.getContainerInfo().getContainerID())); - reply.getResponse().get(); - xceiverClient.watchForCommit(reply.getLogIndex(), 3000); - - // commitInfo Map will be reduced to 2 here - Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(xceiverClient, false); - Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); - Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException")); - Assert - .assertTrue(logCapturer.getOutput().contains("Committed by majority")); - logCapturer.stopCapturing(); - shutdown(); - } - - @Test - public void testWatchForCommitForGroupMismatchException() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); - - // mark the node stale early so that pipleline gets destroyed quickly - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = storageContainerLocationClient - .allocateContainer(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, containerOwner); - XceiverClientSpi xceiverClient = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, xceiverClient.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - xceiverClient.getPipeline()); - Pipeline pipeline = xceiverClient.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - long containerId = container1.getContainerInfo().getContainerID(); - XceiverClientReply reply = xceiverClient.sendCommandAsync( - ContainerTestHelper.getCreateContainerRequest(containerId, - xceiverClient.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - List pipelineList = new ArrayList<>(); - pipelineList.add(pipeline); - ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); - try { - // just watch for a lo index which in not updated in the commitInfo Map - xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000); - Assert.fail("Expected exception not thrown"); - } catch(Exception e) { - Assert.assertTrue(HddsClientUtils - .checkForException(e) instanceof GroupMismatchException); - } - clientManager.releaseClient(xceiverClient, false); - shutdown(); - } - - private OzoneOutputStream createKey(String keyName, ReplicationType type, - long size) throws Exception { - return ContainerTestHelper - .createKey(keyName, type, size, objectStore, volumeName, bucketName); - } - - private void validateData(String keyName, byte[] data) throws Exception { - ContainerTestHelper - .validateData(keyName, data, objectStore, volumeName, bucketName); - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 4da190762b..463e42889a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -57,7 +57,6 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -69,6 +68,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; @@ -723,11 +723,11 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, MiniOzoneCluster cluster) throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); - List streamEntryList = - keyOutputStream.getStreamEntries(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); - for (BlockOutputStreamEntry entry : streamEntryList) { - long id = entry.getBlockID().getContainerID(); + for (OmKeyLocationInfo info : locationInfoList) { + long id = info.getContainerID(); if (!containerIdList.contains(id)) { containerIdList.add(id); } @@ -741,14 +741,11 @@ public static void waitForPipelineClose(OzoneOutputStream outputStream, throws Exception { KeyOutputStream keyOutputStream = (KeyOutputStream) outputStream.getOutputStream(); - List streamEntryList = - keyOutputStream.getStreamEntries(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); - for (BlockOutputStreamEntry entry : streamEntryList) { - long id = entry.getBlockID().getContainerID(); - if (!containerIdList.contains(id)) { - containerIdList.add(id); - } + for (OmKeyLocationInfo info : locationInfoList) { + containerIdList.add(info.getContainerID()); } Assert.assertTrue(!containerIdList.isEmpty()); waitForPipelineClose(cluster, waitForContainerCreation, @@ -787,12 +784,6 @@ public static void waitForPipelineClose(MiniOzoneCluster cluster, } } } - waitForPipelineClose(pipelineList, cluster); - } - - public static void waitForPipelineClose(List pipelineList, - MiniOzoneCluster cluster) - throws TimeoutException, InterruptedException, IOException { for (Pipeline pipeline1 : pipelineList) { // issue pipeline destroy command cluster.getStorageContainerManager().getPipelineManager() diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index f56ce80bf8..1811387e35 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -29,7 +29,7 @@ 3.2.0 0.5.0-SNAPSHOT 0.5.0-SNAPSHOT - 0.4.0-fe2b15d-SNAPSHOT + 0.3.0 1.60 Crater Lake ${ozone.version}