From 4ac0404fe01c2266068b6fc54588e3a4bcec3e12 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Fri, 18 Jan 2019 11:08:35 -0800 Subject: [PATCH] HDDS-959. KeyOutputStream should handle retry failures. Contributed by Lokesh Jain. --- .../hadoop/hdds/scm/XceiverClientManager.java | 11 +- .../scm/client/ContainerOperationClient.java | 10 +- .../hdds/scm/storage/BlockInputStream.java | 2 +- .../hdds/scm/storage/BlockOutputStream.java | 17 +-- .../ozone/client/io/KeyInputStream.java | 2 +- .../ozone/client/io/KeyOutputStream.java | 109 ++++++++---------- .../TestContainerStateMachineIdempotency.java | 2 +- .../rpc/TestOzoneRpcClientAbstract.java | 2 +- .../ozone/scm/TestContainerSmallFile.java | 8 +- .../TestGetCommittedBlockLengthAndPutKey.java | 6 +- .../ozone/scm/TestXceiverClientManager.java | 52 +++++++-- 11 files changed, 127 insertions(+), 94 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index b2735bc79f..f3614635ea 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -129,11 +129,20 @@ public XceiverClientSpi acquireClient(Pipeline pipeline) * Releases a XceiverClientSpi after use. * * @param client client to release + * @param invalidateClient if true, invalidates the client in cache */ - public void releaseClient(XceiverClientSpi client) { + public void releaseClient(XceiverClientSpi client, boolean invalidateClient) { Preconditions.checkNotNull(client); synchronized (clientCache) { client.decrementReference(); + if (invalidateClient) { + Pipeline pipeline = client.getPipeline(); + String key = pipeline.getId().getId().toString() + pipeline.getType(); + XceiverClientSpi cachedClient = clientCache.getIfPresent(key); + if (cachedClient == client) { + clientCache.invalidate(key); + } + } } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 85b5d29f32..6dddc91dff 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -100,7 +100,7 @@ public ContainerWithPipeline createContainer(String owner) return containerWithPipeline; } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -191,7 +191,7 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, return containerWithPipeline; } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -269,7 +269,7 @@ public void deleteContainer(long containerId, Pipeline pipeline, } } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -318,7 +318,7 @@ public ContainerDataProto readContainer(long containerID, return response.getContainerData(); } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -410,7 +410,7 @@ public void closeContainer(long containerId, Pipeline pipeline) ObjectStageChangeRequestProto.Stage.complete); } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index ddd01d3083..5303efd2aa 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -141,7 +141,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { @Override public synchronized void close() { if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager.releaseClient(xceiverClient, false); xceiverClientManager = null; xceiverClient = null; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index b62f7b6a66..6cc0b54e7f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -113,7 +114,7 @@ public class BlockOutputStream extends OutputStream { * @param blockID block ID * @param key chunk key * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls + * @param pipeline pipeline where block will be written * @param traceID container protocol call args * @param chunkSize chunk size * @param bufferList list of byte buffers @@ -124,10 +125,10 @@ public class BlockOutputStream extends OutputStream { */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + XceiverClientManager xceiverClientManager, Pipeline pipeline, String traceID, int chunkSize, long streamBufferFlushSize, - long streamBufferMaxSize, long watchTimeout, - List bufferList, Checksum checksum) { + long streamBufferMaxSize, long watchTimeout, List bufferList, + Checksum checksum) throws IOException { this.blockID = blockID; this.key = key; this.traceID = traceID; @@ -138,7 +139,7 @@ public BlockOutputStream(BlockID blockID, String key, BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; + this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; this.streamBufferFlushSize = streamBufferFlushSize; @@ -500,7 +501,7 @@ public void close() throws IOException { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { - cleanup(); + cleanup(false); } } // clear the currentBuffer @@ -541,9 +542,9 @@ private void setIoException(Exception e) { } } - public void cleanup() { + public void cleanup(boolean invalidateClient) { if (xceiverClientManager != null) { - xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager.releaseClient(xceiverClient, invalidateClient); } xceiverClientManager = null; xceiverClient = null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 99817fbbbc..dde3641f05 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -311,7 +311,7 @@ public static LengthInputStream getFromOmKeyInfo( omKeyLocationInfo.getLength()); } finally { if (!success) { - xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager.releaseClient(xceiverClient, false); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 66e419906d..cbbfed859f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.*; @@ -31,11 +32,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,19 +108,6 @@ public KeyOutputStream() { this.checksum = new Checksum(); } - /** - * For testing purpose only. Not building output stream from blocks, but - * taking from externally. - * - * @param outputStream - * @param length - */ - @VisibleForTesting - public void addStream(OutputStream outputStream, long length) { - streamEntries.add( - new BlockOutputStreamEntry(outputStream, length, checksum)); - } - @VisibleForTesting public List getStreamEntries() { return streamEntries; @@ -213,12 +201,11 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) throws IOException { ContainerWithPipeline containerWithPipeline = scmClient .getContainerWithPipeline(subKeyInfo.getContainerID()); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(), - keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, bufferList, checksum)); + keyArgs.getKeyName(), xceiverClientManager, + containerWithPipeline.getPipeline(), requestID, chunkSize, + subKeyInfo.getLength(), streamBufferFlushSize, streamBufferMaxSize, + watchTimeout, bufferList, checksum)); } @@ -297,12 +284,14 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) current.write(b, off, writeLen); } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { + boolean retryFailure = checkForRetryFailure(ioe); + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe) + || retryFailure) { // for the current iteration, totalDataWritten - currentPos gives the // amount of data already written to the buffer writeLen = (int) (current.getWrittenDataLength() - currentPos); LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex); + handleException(current, currentStreamIndex, retryFailure); } else { throw ioe; } @@ -362,17 +351,19 @@ private void removeEmptyBlocks() { * * @param streamEntry StreamEntry * @param streamIndex Index of the entry + * @param retryFailure if true the xceiverClient needs to be invalidated in + * the client cache. * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - int streamIndex) throws IOException { + int streamIndex, boolean retryFailure) throws IOException { long totalSuccessfulFlushedData = streamEntry.getTotalSuccessfulFlushedData(); //set the correct length for the current stream streamEntry.currentPosition = totalSuccessfulFlushedData; long bufferedDataLen = computeBufferData(); // just clean up the current stream. - streamEntry.cleanup(); + streamEntry.cleanup(retryFailure); if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. @@ -390,7 +381,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, private boolean checkIfContainerIsClosed(IOException ioe) { if (ioe.getCause() != null) { - return checkIfContainerNotOpenOrRaftRetryFailureException(ioe) || Optional + return checkForException(ioe, ContainerNotOpenException.class) || Optional .of(ioe.getCause()) .filter(e -> e instanceof StorageContainerException) .map(e -> (StorageContainerException) e) @@ -400,13 +391,23 @@ private boolean checkIfContainerIsClosed(IOException ioe) { return false; } - private boolean checkIfContainerNotOpenOrRaftRetryFailureException( - IOException ioe) { + /** + * Checks if the provided exception signifies retry failure in ratis client. + * In case of retry failure, ratis client throws RaftRetryFailureException + * and all succeeding operations are failed with AlreadyClosedException. + */ + private boolean checkForRetryFailure(IOException ioe) { + return checkForException(ioe, RaftRetryFailureException.class, + AlreadyClosedException.class); + } + + private boolean checkForException(IOException ioe, Class... classes) { Throwable t = ioe.getCause(); while (t != null) { - if (t instanceof ContainerNotOpenException - || t instanceof RaftRetryFailureException) { - return true; + for (Class cls : classes) { + if (cls.isInstance(t)) { + return true; + } } t = t.getCause(); } @@ -469,11 +470,13 @@ private void handleFlushOrClose(boolean close) throws IOException { entry.flush(); } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { + boolean retryFailure = checkForRetryFailure(ioe); + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe) + || retryFailure) { // This call will allocate a new streamEntry and write the Data. // Close needs to be retried on the newly allocated streamEntry as // as well. - handleException(entry, streamIndex); + handleException(entry, streamIndex, retryFailure); handleFlushOrClose(close); } else { throw ioe; @@ -643,7 +646,7 @@ private static class BlockOutputStreamEntry extends OutputStream { private BlockID blockID; private final String key; private final XceiverClientManager xceiverClientManager; - private final XceiverClientSpi xceiverClient; + private final Pipeline pipeline; private final Checksum checksum; private final String requestId; private final int chunkSize; @@ -660,14 +663,14 @@ private static class BlockOutputStreamEntry extends OutputStream { @SuppressWarnings("parameternumber") BlockOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String requestId, int chunkSize, + Pipeline pipeline, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, long watchTimeout, List bufferList, Checksum checksum) { this.outputStream = null; this.blockID = blockID; this.key = key; this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; + this.pipeline = pipeline; this.requestId = requestId; this.chunkSize = chunkSize; @@ -680,30 +683,6 @@ private static class BlockOutputStreamEntry extends OutputStream { this.bufferList = bufferList; } - /** - * For testing purpose, taking a some random created stream instance. - * @param outputStream a existing writable output stream - * @param length the length of data to write to the stream - */ - BlockOutputStreamEntry(OutputStream outputStream, long length, - Checksum checksum) { - this.outputStream = outputStream; - this.blockID = null; - this.key = null; - this.xceiverClientManager = null; - this.xceiverClient = null; - this.requestId = null; - this.chunkSize = -1; - - this.length = length; - this.currentPosition = 0; - streamBufferFlushSize = 0; - streamBufferMaxSize = 0; - bufferList = null; - watchTimeout = 0; - this.checksum = checksum; - } - long getLength() { return length; } @@ -712,11 +691,17 @@ long getRemaining() { return length - currentPosition; } - private void checkStream() { + /** + * BlockOutputStream is initialized in this function. This makes sure that + * xceiverClient initialization is not done during preallocation and only + * done when data is written. + * @throws IOException if xceiverClient initialization fails + */ + private void checkStream() throws IOException { if (this.outputStream == null) { this.outputStream = new BlockOutputStream(blockID, key, xceiverClientManager, - xceiverClient, requestId, chunkSize, streamBufferFlushSize, + pipeline, requestId, chunkSize, streamBufferFlushSize, streamBufferMaxSize, watchTimeout, bufferList, checksum); } } @@ -781,11 +766,11 @@ long getWrittenDataLength() throws IOException { throw new IOException("Invalid Output Stream for Key: " + key); } - void cleanup() { + void cleanup(boolean invalidateClient) throws IOException { checkStream(); if (this.outputStream instanceof BlockOutputStream) { BlockOutputStream out = (BlockOutputStream) this.outputStream; - out.cleanup(); + out.cleanup(invalidateClient); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java index 78a85110d3..c4dbb8bebb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java @@ -116,6 +116,6 @@ public void testContainerStateMachineIdempotency() throws Exception { } catch (IOException ioe) { Assert.fail("Container operation failed" + ioe); } - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index e7bca5e78d..529fcc3968 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -698,7 +698,7 @@ public void testPutKeyAndGetKeyThreeNodes() Assert.assertTrue( e.getMessage().contains("on the pipeline " + pipeline.getId())); } - manager.releaseClient(clientSpi); + manager.releaseClient(clientSpi, false); } private void readKey(OzoneBucket bucket, String keyName, String data) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index ecf0d846b1..daf09c1e8d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -98,7 +98,7 @@ public void testAllocateWrite() throws Exception { ContainerProtocolCalls.readSmallFile(client, blockID, traceID); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -121,7 +121,7 @@ public void testInvalidBlockRead() throws Exception { // Try to read a Key Container Name ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, blockID, traceID); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -149,7 +149,7 @@ public void testInvalidContainerRead() throws Exception { ContainerProtocolCalls.readSmallFile(client, ContainerTestHelper.getTestBlockID( nonExistContainerID), traceID); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -202,7 +202,7 @@ public void testReadWriteWithBCSId() throws Exception { ContainerProtocolCalls.readSmallFile(client, blockID1, traceID); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index b4601fa4d0..7c31b14362 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -114,7 +114,7 @@ public void tesGetCommittedBlockLength() throws Exception { Assert.assertTrue( BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); Assert.assertTrue(response.getBlockLength() == data.length); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -139,7 +139,7 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { } catch (StorageContainerException sce) { Assert.assertTrue(sce.getMessage().contains("Unable to find the block")); } - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -180,6 +180,6 @@ public void tesPutKeyResposne() throws Exception { // This will also ensure that closing the container committed the block // on the Datanodes. Assert.assertEquals(responseBlockID, blockID); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 8b35bbbb18..f28493fed8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -96,9 +96,9 @@ public void testCaching() throws IOException { Assert.assertEquals(2, client3.getRefcount()); Assert.assertEquals(2, client1.getRefcount()); Assert.assertEquals(client1, client3); - clientManager.releaseClient(client1); - clientManager.releaseClient(client2); - clientManager.releaseClient(client3); + clientManager.releaseClient(client1, false); + clientManager.releaseClient(client2, false); + clientManager.releaseClient(client3, false); } @Test @@ -140,7 +140,7 @@ public void testFreeByReference() throws IOException { // After releasing the client, this connection should be closed // and any container operations should fail - clientManager.releaseClient(client1); + clientManager.releaseClient(client1, false); String expectedMessage = "This channel is not connected."; try { @@ -152,7 +152,7 @@ public void testFreeByReference() throws IOException { Assert.assertEquals(e.getClass(), IOException.class); Assert.assertTrue(e.getMessage().contains(expectedMessage)); } - clientManager.releaseClient(client2); + clientManager.releaseClient(client2, false); } @Test @@ -171,7 +171,7 @@ public void testFreeByEviction() throws IOException { .acquireClient(container1.getPipeline()); Assert.assertEquals(1, client1.getRefcount()); - clientManager.releaseClient(client1); + clientManager.releaseClient(client1, false); Assert.assertEquals(0, client1.getRefcount()); ContainerWithPipeline container2 = storageContainerLocationClient @@ -200,6 +200,44 @@ public void testFreeByEviction() throws IOException { Assert.assertEquals(e.getClass(), IOException.class); Assert.assertTrue(e.getMessage().contains(expectedMessage)); } - clientManager.releaseClient(client2); + clientManager.releaseClient(client2, false); + } + + @Test + public void testFreeByRetryFailure() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); + XceiverClientManager clientManager = new XceiverClientManager(conf); + Cache cache = + clientManager.getClientCache(); + + // client is added in cache + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(clientManager.getType(), clientManager.getFactor(), + containerOwner); + XceiverClientSpi client1 = + clientManager.acquireClient(container1.getPipeline()); + clientManager.acquireClient(container1.getPipeline()); + Assert.assertEquals(2, client1.getRefcount()); + + // client should be invalidated in the cache + clientManager.releaseClient(client1, true); + Assert.assertEquals(1, client1.getRefcount()); + Assert.assertNull(cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType())); + + // new client should be added in cache + XceiverClientSpi client2 = + clientManager.acquireClient(container1.getPipeline()); + Assert.assertNotEquals(client1, client2); + Assert.assertEquals(1, client2.getRefcount()); + + // on releasing the old client the entry in cache should not be invalidated + clientManager.releaseClient(client1, true); + Assert.assertEquals(0, client1.getRefcount()); + Assert.assertNotNull(cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType())); } }