diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 13c4a0c729..139f49404c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -48,16 +48,12 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls .putBlockAsync; @@ -97,7 +93,6 @@ public class BlockOutputStream extends OutputStream { private int chunkSize; private final long streamBufferFlushSize; private final long streamBufferMaxSize; - private final long watchTimeout; private BufferPool bufferPool; // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next @@ -111,10 +106,6 @@ public class BlockOutputStream extends OutputStream { // effective data write attempted so far for the block private long writtenDataLength; - // total data which has been successfully flushed and acknowledged - // by all servers - private long totalAckDataLength; - // List containing buffers for which the putBlock call will // update the length in the datanodes. This list will just maintain // references to the buffers in the BufferPool which will be cleared @@ -123,17 +114,10 @@ public class BlockOutputStream extends OutputStream { // which got written between successive putBlock calls. private List bufferList; - // future Map to hold up all putBlock futures - private ConcurrentHashMap> - futureMap; - - // The map should maintain the keys (logIndexes) in order so that while - // removing we always end up updating incremented data flushed length. + // This object will maintain the commitIndexes and byteBufferList in order // Also, corresponding to the logIndex, the corresponding list of buffers will // be released from the buffer pool. - private ConcurrentSkipListMap> - commitIndex2flushedDataMap; + private final CommitWatcher commitWatcher; private List failedServers; @@ -175,20 +159,17 @@ public BlockOutputStream(BlockID blockID, String key, this.chunkIndex = 0; this.streamBufferFlushSize = streamBufferFlushSize; this.streamBufferMaxSize = streamBufferMaxSize; - this.watchTimeout = watchTimeout; this.bufferPool = bufferPool; this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitIndex2flushedDataMap = new ConcurrentSkipListMap<>(); - totalAckDataLength = 0; - futureMap = new ConcurrentHashMap<>(); + commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout); + bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = Collections.emptyList(); - bufferList = null; ioException = new AtomicReference<>(null); } @@ -198,7 +179,7 @@ public BlockID getBlockID() { } public long getTotalAckDataLength() { - return totalAckDataLength; + return commitWatcher.getTotalAckDataLength(); } public long getWrittenDataLength() { @@ -230,7 +211,7 @@ public IOException getIoException() { @VisibleForTesting public Map> getCommitIndex2flushedDataMap() { - return commitIndex2flushedDataMap; + return commitWatcher.getCommitIndex2flushedDataMap(); } @Override @@ -333,34 +314,6 @@ public void writeOnRetry(long len) throws IOException { } } - /** - * just update the totalAckDataLength. In case of failure, - * we will read the data starting from totalAckDataLength. - */ - private void updateFlushIndex(List indexes) { - Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty()); - for (long index : indexes) { - Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); - List buffers = commitIndex2flushedDataMap.remove(index); - long length = buffers.stream().mapToLong(value -> { - int pos = value.position(); - Preconditions.checkArgument(pos <= chunkSize); - return pos; - }).sum(); - // totalAckDataLength replicated yet should always be incremented - // with the current length being returned from commitIndex2flushedDataMap. - totalAckDataLength += length; - LOG.debug("Total data successfully replicated: " + totalAckDataLength); - futureMap.remove(totalAckDataLength); - // Flush has been committed to required servers successful. - // just release the current buffer from the buffer pool corresponding - // to the buffers that have been committed with the putBlock call. - for (ByteBuffer byteBuffer : buffers) { - bufferPool.releaseBuffer(byteBuffer); - } - } - } - /** * This is a blocking call. It will wait for the flush till the commit index * at the head of the commitIndex2flushedDataMap gets replicated to all or @@ -370,7 +323,7 @@ private void updateFlushIndex(List indexes) { private void handleFullBuffer() throws IOException { try { checkOpen(); - if (!futureMap.isEmpty()) { + if (!commitWatcher.getFutureMap().isEmpty()) { waitOnFlushFutures(); } } catch (InterruptedException | ExecutionException e) { @@ -378,47 +331,31 @@ private void handleFullBuffer() throws IOException { adjustBuffersOnException(); throw getIoException(); } - if (!commitIndex2flushedDataMap.isEmpty()) { - watchForCommit( - commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v) - .min().getAsLong()); - } + watchForCommit(true); } - private void adjustBuffers(long commitIndex) { - List keyList = commitIndex2flushedDataMap.keySet().stream() - .filter(p -> p <= commitIndex).collect(Collectors.toList()); - if (keyList.isEmpty()) { - return; - } else { - updateFlushIndex(keyList); - } - } // It may happen that once the exception is encountered , we still might // have successfully flushed up to a certain index. Make sure the buffers // only contain data which have not been sufficiently replicated private void adjustBuffersOnException() { - adjustBuffers(xceiverClient.getReplicatedMinCommitIndex()); + commitWatcher.releaseBuffersOnException(); } /** * calls watchForCommit API of the Ratis Client. For Standalone client, * it is a no op. - * @param commitIndex log index to watch for + * @param bufferFull flag indicating whether bufferFull condition is hit or + * its called as part flush/close * @return minimum commit index replicated to all nodes * @throws IOException IOException in case watch gets timed out */ - private void watchForCommit(long commitIndex) throws IOException { + private void watchForCommit(boolean bufferFull) throws IOException { checkOpen(); - Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); - long index; try { - XceiverClientReply reply = - xceiverClient.watchForCommit(commitIndex, watchTimeout); - if (reply == null) { - index = 0; - } else { + XceiverClientReply reply = bufferFull ? + commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex(); + if (reply != null) { List dnList = reply.getDatanodes(); if (!dnList.isEmpty()) { if (failedServers.isEmpty()) { @@ -426,13 +363,9 @@ private void watchForCommit(long commitIndex) throws IOException { } failedServers.addAll(dnList); } - index = reply.getLogIndex(); } - adjustBuffers(index); - } catch (TimeoutException | InterruptedException | ExecutionException e) { - LOG.warn("watchForCommit failed for index " + commitIndex, e); - setIoException(e); - adjustBuffersOnException(); + } catch (IOException ioe) { + setIoException(ioe); throw getIoException(); } } @@ -471,14 +404,14 @@ ContainerCommandResponseProto> executePutBlock() blockID = responseBlockID; LOG.debug( "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitIndex2flushedDataMap.size() + " flushLength " + + commitWatcher.getCommitInfoMapSize() + " flushLength " + flushPos + " numBuffers " + byteBufferList.size() + " blockID " + blockID + " bufferPool size" + bufferPool .getSize() + " currentBufferIndex " + bufferPool .getCurrentBufferIndex()); // for standalone protocol, logIndex will always be 0. - commitIndex2flushedDataMap - .put(asyncReply.getLogIndex(), byteBufferList); + commitWatcher + .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { @@ -493,7 +426,7 @@ ContainerCommandResponseProto> executePutBlock() throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } - futureMap.put(flushPos, flushFuture); + commitWatcher.getFutureMap().put(flushPos, flushFuture); return flushFuture; } @@ -553,18 +486,7 @@ private void handleFlush() executePutBlock(); } waitOnFlushFutures(); - if (!commitIndex2flushedDataMap.isEmpty()) { - // wait for the last commit index in the commitIndex2flushedDataMap - // to get committed to all or majority of nodes in case timeout - // happens. - long lastIndex = - commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v) - .max().getAsLong(); - LOG.debug( - "waiting for last flush Index " + lastIndex + " to catch up"); - watchForCommit(lastIndex); - } - + watchForCommit(false); // just check again if the exception is hit while waiting for the // futures to ensure flush has indeed succeeded @@ -594,11 +516,11 @@ public void close() throws IOException { } } - private void waitOnFlushFutures() throws InterruptedException, ExecutionException { CompletableFuture combinedFuture = CompletableFuture.allOf( - futureMap.values().toArray(new CompletableFuture[futureMap.size()])); + commitWatcher.getFutureMap().values().toArray( + new CompletableFuture[commitWatcher.getFutureMap().size()])); // wait for all the transactions to complete combinedFuture.get(); } @@ -637,18 +559,11 @@ public void cleanup(boolean invalidateClient) { } xceiverClientManager = null; xceiverClient = null; - if (futureMap != null) { - futureMap.clear(); - } - futureMap = null; + commitWatcher.cleanup(); if (bufferList != null) { bufferList.clear(); } bufferList = null; - if (commitIndex2flushedDataMap != null) { - commitIndex2flushedDataMap.clear(); - } - commitIndex2flushedDataMap = null; responseExecutor.shutdown(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java new file mode 100644 index 0000000000..aeac941af9 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This class maintains the map of the commitIndexes to be watched for + * successful replication in the datanodes in a given pipeline. It also releases + * the buffers associated with the user data back to {@Link BufferPool} once + * minimum replication criteria is achieved during an ozone key write. + */ +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutionException; + +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; + +/** + * This class executes watchForCommit on ratis pipeline and releases + * buffers once data successfully gets replicated. + */ +public class CommitWatcher { + + private static final Logger LOG = + LoggerFactory.getLogger(CommitWatcher.class); + + // A reference to the pool of buffers holding the data + private BufferPool bufferPool; + + // The map should maintain the keys (logIndexes) in order so that while + // removing we always end up updating incremented data flushed length. + // Also, corresponding to the logIndex, the corresponding list of buffers will + // be released from the buffer pool. + private ConcurrentSkipListMap> + commitIndex2flushedDataMap; + + // future Map to hold up all putBlock futures + private ConcurrentHashMap> + futureMap; + + private XceiverClientSpi xceiverClient; + + private final long watchTimeout; + + // total data which has been successfully flushed and acknowledged + // by all servers + private long totalAckDataLength; + + public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient, + long watchTimeout) { + this.bufferPool = bufferPool; + this.xceiverClient = xceiverClient; + this.watchTimeout = watchTimeout; + commitIndex2flushedDataMap = new ConcurrentSkipListMap<>(); + totalAckDataLength = 0; + futureMap = new ConcurrentHashMap<>(); + } + + /** + * just update the totalAckDataLength. In case of failure, + * we will read the data starting from totalAckDataLength. + */ + private long releaseBuffers(List indexes) { + Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty()); + for (long index : indexes) { + Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); + List buffers = commitIndex2flushedDataMap.remove(index); + long length = buffers.stream().mapToLong(value -> { + int pos = value.position(); + return pos; + }).sum(); + totalAckDataLength += length; + // clear the future object from the future Map + Preconditions.checkNotNull(futureMap.remove(totalAckDataLength)); + for (ByteBuffer byteBuffer : buffers) { + bufferPool.releaseBuffer(byteBuffer); + } + } + return totalAckDataLength; + } + + public void updateCommitInfoMap(long index, List byteBufferList) { + commitIndex2flushedDataMap + .put(index, byteBufferList); + } + + int getCommitInfoMapSize() { + return commitIndex2flushedDataMap.size(); + } + + /** + * Calls watch for commit for the first index in commitIndex2flushedDataMap to + * the Ratis client. + * @return reply reply from raft client + * @throws IOException in case watchForCommit fails + */ + public XceiverClientReply watchOnFirstIndex() throws IOException { + if (!commitIndex2flushedDataMap.isEmpty()) { + // wait for the first commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long index = + commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).min() + .getAsLong(); + LOG.debug("waiting for first index " + index + " to catch up"); + return watchForCommit(index); + } else { + return null; + } + } + + /** + * Calls watch for commit for the first index in commitIndex2flushedDataMap to + * the Ratis client. + * @return reply reply from raft client + * @throws IOException in case watchForCommit fails + */ + public XceiverClientReply watchOnLastIndex() + throws IOException { + if (!commitIndex2flushedDataMap.isEmpty()) { + // wait for the commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long index = + commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).max() + .getAsLong(); + LOG.debug("waiting for last flush Index " + index + " to catch up"); + return watchForCommit(index); + } else { + return null; + } + } + + + private void adjustBuffers(long commitIndex) { + List keyList = commitIndex2flushedDataMap.keySet().stream() + .filter(p -> p <= commitIndex).collect(Collectors.toList()); + if (keyList.isEmpty()) { + return; + } else { + releaseBuffers(keyList); + } + } + + // It may happen that once the exception is encountered , we still might + // have successfully flushed up to a certain index. Make sure the buffers + // only contain data which have not been sufficiently replicated + void releaseBuffersOnException() { + adjustBuffers(xceiverClient.getReplicatedMinCommitIndex()); + } + + + /** + * calls watchForCommit API of the Ratis Client. For Standalone client, + * it is a no op. + * @param commitIndex log index to watch for + * @return minimum commit index replicated to all nodes + * @throws IOException IOException in case watch gets timed out + */ + public XceiverClientReply watchForCommit(long commitIndex) + throws IOException { + Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty()); + long index; + try { + XceiverClientReply reply = + xceiverClient.watchForCommit(commitIndex, watchTimeout); + if (reply == null) { + index = 0; + } else { + index = reply.getLogIndex(); + } + adjustBuffers(index); + return reply; + } catch (TimeoutException | InterruptedException | ExecutionException e) { + LOG.warn("watchForCommit failed for index " + commitIndex, e); + IOException ioException = new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + releaseBuffersOnException(); + throw ioException; + } + } + + @VisibleForTesting + public ConcurrentSkipListMap> getCommitIndex2flushedDataMap() { + return commitIndex2flushedDataMap; + } + + public ConcurrentHashMap> getFutureMap() { + return futureMap; + } + + public long getTotalAckDataLength() { + return totalAckDataLength; + } + + public void cleanup() { + if (commitIndex2flushedDataMap != null) { + commitIndex2flushedDataMap.clear(); + } + if (futureMap != null) { + futureMap.clear(); + } + commitIndex2flushedDataMap = null; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java new file mode 100644 index 0000000000..ea51900971 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.hdds.scm.storage.CommitWatcher; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Class to test CommitWatcher functionality. + */ +public class TestCommitWatcher { + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf = new OzoneConfiguration(); + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static long flushSize; + private static long maxFlushSize; + private static long blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static String containerOwner = "OZONE"; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + chunkSize = (int)(1 * OzoneConsts.MB); + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + StorageUnit.MB); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "testblockoutputstream"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + storageContainerLocationClient = cluster + .getStorageContainerLocationClient(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testReleaseBuffers() throws Exception { + int capacity = 2; + BufferPool bufferPool = new BufferPool(chunkSize, capacity); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + Pipeline pipeline = container.getPipeline(); + long containerId = container.getContainerInfo().getContainerID(); + XceiverClientSpi xceiverClient = clientManager.acquireClient(pipeline); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertTrue(xceiverClient instanceof XceiverClientRatis); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; + CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000); + BlockID blockID = ContainerTestHelper.getTestBlockID(containerId); + List bufferList = new ArrayList<>(); + List replies = new ArrayList<>(); + long length = 0; + List> + futures = new ArrayList<>(); + for (int i = 0; i < capacity; i++) { + bufferList.clear(); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(pipeline, blockID, chunkSize); + // add the data to the buffer pool + ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put( + writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer()); + ratisClient.sendCommandAsync(writeChunkRequest); + ContainerProtos.ContainerCommandRequestProto putBlockRequest = + ContainerTestHelper + .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk()); + XceiverClientReply reply = ratisClient.sendCommandAsync(putBlockRequest); + bufferList.add(byteBuffer); + length += byteBuffer.position(); + CompletableFuture future = + reply.getResponse().thenApply(v -> { + watcher.updateCommitInfoMap(reply.getLogIndex(), bufferList); + return v; + }); + futures.add(future); + watcher.getFutureMap().put(length, future); + replies.add(reply); + } + + Assert.assertTrue(replies.size() == 2); + // wait on the 1st putBlock to complete + CompletableFuture future1 = + futures.get(0); + CompletableFuture future2 = + futures.get(1); + future1.get(); + Assert.assertNotNull(watcher.getFutureMap().get(new Long(chunkSize))); + Assert.assertTrue( + watcher.getFutureMap().get(new Long(chunkSize)).equals(future1)); + // wait on 2nd putBlock to complete + future2.get(); + Assert.assertNotNull(watcher.getFutureMap().get(new Long(2 * chunkSize))); + Assert.assertTrue( + watcher.getFutureMap().get(new Long(2 * chunkSize)).equals(future2)); + Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 2); + watcher.watchOnFirstIndex(); + Assert.assertFalse(watcher.getCommitIndex2flushedDataMap() + .containsKey(replies.get(0).getLogIndex())); + Assert.assertFalse(watcher.getFutureMap().containsKey(chunkSize)); + Assert.assertTrue(watcher.getTotalAckDataLength() >= chunkSize); + watcher.watchOnLastIndex(); + Assert.assertFalse(watcher.getCommitIndex2flushedDataMap() + .containsKey(replies.get(1).getLogIndex())); + Assert.assertFalse(watcher.getFutureMap().containsKey(2 * chunkSize)); + Assert.assertTrue(watcher.getTotalAckDataLength() == 2 * chunkSize); + Assert.assertTrue(watcher.getFutureMap().isEmpty()); + Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty()); + } + + @Test + public void testReleaseBuffersOnException() throws Exception { + int capacity = 2; + BufferPool bufferPool = new BufferPool(chunkSize, capacity); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, containerOwner); + Pipeline pipeline = container.getPipeline(); + long containerId = container.getContainerInfo().getContainerID(); + XceiverClientSpi xceiverClient = clientManager.acquireClient(pipeline); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertTrue(xceiverClient instanceof XceiverClientRatis); + XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; + CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000); + BlockID blockID = ContainerTestHelper.getTestBlockID(containerId); + List bufferList = new ArrayList<>(); + List replies = new ArrayList<>(); + long length = 0; + List> + futures = new ArrayList<>(); + for (int i = 0; i < capacity; i++) { + bufferList.clear(); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(pipeline, blockID, chunkSize); + // add the data to the buffer pool + ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put( + writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer()); + ratisClient.sendCommandAsync(writeChunkRequest); + ContainerProtos.ContainerCommandRequestProto putBlockRequest = + ContainerTestHelper + .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk()); + XceiverClientReply reply = ratisClient.sendCommandAsync(putBlockRequest); + bufferList.add(byteBuffer); + length += byteBuffer.position(); + CompletableFuture future = + reply.getResponse().thenApply(v -> { + watcher.updateCommitInfoMap(reply.getLogIndex(), bufferList); + return v; + }); + futures.add(future); + watcher.getFutureMap().put(length, future); + replies.add(reply); + } + + Assert.assertTrue(replies.size() == 2); + // wait on the 1st putBlock to complete + CompletableFuture future1 = + futures.get(0); + CompletableFuture future2 = + futures.get(1); + future1.get(); + Assert.assertNotNull(watcher.getFutureMap().get(new Long(chunkSize))); + Assert.assertTrue( + watcher.getFutureMap().get(new Long(chunkSize)).equals(future1)); + // wait on 2nd putBlock to complete + future2.get(); + Assert.assertNotNull(watcher.getFutureMap().get(new Long(2 * chunkSize))); + Assert.assertTrue( + watcher.getFutureMap().get(new Long(2 * chunkSize)).equals(future2)); + Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 2); + watcher.watchOnFirstIndex(); + Assert.assertFalse(watcher.getCommitIndex2flushedDataMap() + .containsKey(replies.get(0).getLogIndex())); + Assert.assertFalse(watcher.getFutureMap().containsKey(chunkSize)); + Assert.assertTrue(watcher.getTotalAckDataLength() >= chunkSize); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + try { + // just watch for a higher index so as to ensure, it does an actual + // call to Ratis. Otherwise, it may just return in case the commitInfoMap + // is updated to the latest index in putBlock response. + watcher.watchForCommit(replies.get(1).getLogIndex() + 1); + } catch(IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof TimeoutException); + } + long lastIndex = replies.get(1).getLogIndex(); + // Depending on the last successfully replicated commitIndex, either we + // discard only 1st buffer or both buffers + Assert.assertTrue(ratisClient.getReplicatedMinCommitIndex() <= lastIndex); + if (ratisClient.getReplicatedMinCommitIndex() < replies.get(1) + .getLogIndex()) { + Assert.assertTrue(watcher.getTotalAckDataLength() == chunkSize); + Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 1); + Assert.assertTrue(watcher.getFutureMap().size() == 1); + } else { + Assert.assertTrue(watcher.getTotalAckDataLength() == 2 * chunkSize); + Assert.assertTrue(watcher.getFutureMap().isEmpty()); + Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty()); + } + } +}