diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 97624061c3..d542abc9b2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import java.io.Closeable; import java.io.IOException; @@ -58,7 +59,7 @@ public class XceiverClientManager implements Closeable { //TODO : change this to SCM configuration class private final Configuration conf; - private final Cache clientCache; + private final Cache clientCache; private final boolean useRatis; private static XceiverClientMetrics metrics; @@ -82,10 +83,10 @@ public XceiverClientManager(Configuration conf) { .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .maximumSize(maxSize) .removalListener( - new RemovalListener() { + new RemovalListener() { @Override public void onRemoval( - RemovalNotification + RemovalNotification removalNotification) { synchronized (clientCache) { // Mark the entry as evicted @@ -97,7 +98,7 @@ public void onRemoval( } @VisibleForTesting - public Cache getClientCache() { + public Cache getClientCache() { return clientCache; } @@ -112,14 +113,14 @@ public Cache getClientCache() { * @return XceiverClientSpi connected to a container * @throws IOException if a XceiverClientSpi cannot be acquired */ - public XceiverClientSpi acquireClient(Pipeline pipeline, long containerID) + public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { Preconditions.checkNotNull(pipeline); Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); synchronized (clientCache) { - XceiverClientSpi info = getClient(pipeline, containerID); + XceiverClientSpi info = getClient(pipeline); info.incrementReference(); return info; } @@ -137,10 +138,10 @@ public void releaseClient(XceiverClientSpi client) { } } - private XceiverClientSpi getClient(Pipeline pipeline, long containerID) + private XceiverClientSpi getClient(Pipeline pipeline) throws IOException { try { - return clientCache.get(containerID, + return clientCache.get(pipeline.getId(), new Callable() { @Override public XceiverClientSpi call() throws Exception { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index fed589c81d..c2bfb42d85 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -96,8 +96,7 @@ public ContainerWithPipeline createContainer(String owner) xceiverClientManager.getType(), xceiverClientManager.getFactor(), owner); Pipeline pipeline = containerWithPipeline.getPipeline(); - client = xceiverClientManager.acquireClient(pipeline, - containerWithPipeline.getContainerInfo().getContainerID()); + client = xceiverClientManager.acquireClient(pipeline); // Allocated State means that SCM has allocated this pipeline in its // namespace. The client needs to create the pipeline on the machines @@ -207,8 +206,7 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, storageContainerLocationClient.allocateContainer(type, factor, owner); Pipeline pipeline = containerWithPipeline.getPipeline(); - client = xceiverClientManager.acquireClient(pipeline, - containerWithPipeline.getContainerInfo().getContainerID()); + client = xceiverClientManager.acquireClient(pipeline); // Allocated State means that SCM has allocated this pipeline in its // namespace. The client needs to create the pipeline on the machines @@ -217,8 +215,7 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, createPipeline(client, pipeline); } // connect to pipeline leader and allocate container on leader datanode. - client = xceiverClientManager.acquireClient(pipeline, - containerWithPipeline.getContainerInfo().getContainerID()); + client = xceiverClientManager.acquireClient(pipeline); createContainer(client, containerWithPipeline.getContainerInfo().getContainerID()); return containerWithPipeline; @@ -279,7 +276,7 @@ public void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException { XceiverClientSpi client = null; try { - client = xceiverClientManager.acquireClient(pipeline, containerId); + client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); ContainerProtocolCalls .deleteContainer(client, containerId, force, traceID); @@ -334,7 +331,7 @@ public ContainerData readContainer(long containerID, Pipeline pipeline) throws IOException { XceiverClientSpi client = null; try { - client = xceiverClientManager.acquireClient(pipeline, containerID); + client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); ReadContainerResponseProto response = ContainerProtocolCalls.readContainer(client, containerID, traceID); @@ -421,7 +418,7 @@ public void closeContainer(long containerId, Pipeline pipeline) For now, take the #2 way. */ // Actually close the container on Datanode - client = xceiverClientManager.acquireClient(pipeline, containerId); + client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); storageContainerLocationClient.notifyObjectStageChange( diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 2f17035036..779161360f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -273,7 +273,7 @@ public static LengthInputStream getFromOmKeyInfo( ContainerWithPipeline containerWithPipeline = storageContainerLocationClient.getContainerWithPipeline(containerID); XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(containerWithPipeline.getPipeline(), containerID); + .acquireClient(containerWithPipeline.getPipeline()); boolean success = false; containerKey = omKeyLocationInfo.getLocalID(); try { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index de666ceadf..0a38a5aaa4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -189,8 +189,7 @@ private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo) ContainerInfo container = containerWithPipeline.getContainerInfo(); XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(), - container.getContainerID()); + xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); // create container if needed if (subKeyInfo.getShouldCreateContainer()) { try { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index 84a40283aa..90dc2c47b0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -86,8 +86,7 @@ public void testAllocateWrite() throws Exception { xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE, containerOwner); XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), - container.getContainerInfo().getContainerID()); + .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, container.getContainerInfo().getContainerID(), traceID); @@ -110,8 +109,7 @@ public void testInvalidBlockRead() throws Exception { xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE, containerOwner); XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), - container.getContainerInfo().getContainerID()); + .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, container.getContainerInfo().getContainerID(), traceID); @@ -135,8 +133,7 @@ public void testInvalidContainerRead() throws Exception { xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE, containerOwner); XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), - container.getContainerInfo().getContainerID()); + .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, container.getContainerInfo().getContainerID(), traceID); BlockID blockID = ContainerTestHelper.getTestBlockID( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 42047aaaec..03a0b8a589 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -91,8 +91,7 @@ public void tesGetCommittedBlockLength() throws Exception { HddsProtos.ReplicationFactor.ONE, containerOwner); long containerID = container.getContainerInfo().getContainerID(); Pipeline pipeline = container.getPipeline(); - XceiverClientSpi client = - xceiverClientManager.acquireClient(pipeline, containerID); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); //create the container ContainerProtocolCalls.createContainer(client, containerID, traceID); @@ -128,7 +127,7 @@ public void tesGetCommittedBlockLengthWithClosedContainer() long containerID = container.getContainerInfo().getContainerID(); Pipeline pipeline = container.getPipeline(); XceiverClientSpi client = - xceiverClientManager.acquireClient(pipeline, containerID); + xceiverClientManager.acquireClient(pipeline); // create the container ContainerProtocolCalls.createContainer(client, containerID, traceID); @@ -162,7 +161,7 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { HddsProtos.ReplicationFactor.ONE, containerOwner); long containerID = container.getContainerInfo().getContainerID(); XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), containerID); + .acquireClient(container.getPipeline()); ContainerProtocolCalls.createContainer(client, containerID, traceID); BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); @@ -187,7 +186,7 @@ public void testGetCommittedBlockLengthForOpenBlock() throws Exception { HddsProtos.ReplicationFactor.ONE, containerOwner); long containerID = container.getContainerInfo().getContainerID(); XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), containerID); + .acquireClient(container.getPipeline()); ContainerProtocolCalls .createContainer(client, containerID, traceID); @@ -223,8 +222,7 @@ public void tesPutKeyResposne() throws Exception { HddsProtos.ReplicationFactor.ONE, containerOwner); long containerID = container.getContainerInfo().getContainerID(); Pipeline pipeline = container.getPipeline(); - XceiverClientSpi client = - xceiverClientManager.acquireClient(pipeline, containerID); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); //create the container ContainerProtocolCalls.createContainer(client, containerID, traceID); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 0d363de343..da445bfa9d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -20,6 +20,7 @@ import com.google.common.cache.Cache; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -81,21 +82,18 @@ public void testCaching() throws IOException { .allocateContainer(clientManager.getType(), clientManager.getFactor(), containerOwner); XceiverClientSpi client1 = clientManager - .acquireClient(container1.getPipeline(), - container1.getContainerInfo().getContainerID()); + .acquireClient(container1.getPipeline()); Assert.assertEquals(1, client1.getRefcount()); ContainerWithPipeline container2 = storageContainerLocationClient .allocateContainer(clientManager.getType(), clientManager.getFactor(), containerOwner); XceiverClientSpi client2 = clientManager - .acquireClient(container2.getPipeline(), - container2.getContainerInfo().getContainerID()); + .acquireClient(container2.getPipeline()); Assert.assertEquals(1, client2.getRefcount()); XceiverClientSpi client3 = clientManager - .acquireClient(container1.getPipeline(), - container1.getContainerInfo().getContainerID()); + .acquireClient(container1.getPipeline()); Assert.assertEquals(2, client3.getRefcount()); Assert.assertEquals(2, client1.getRefcount()); Assert.assertEquals(client1, client3); @@ -109,7 +107,7 @@ public void testFreeByReference() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache cache = + Cache cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -117,8 +115,7 @@ public void testFreeByReference() throws IOException { clientManager.getType(), HddsProtos.ReplicationFactor.ONE, containerOwner); XceiverClientSpi client1 = clientManager - .acquireClient(container1.getPipeline(), - container1.getContainerInfo().getContainerID()); + .acquireClient(container1.getPipeline()); Assert.assertEquals(1, client1.getRefcount()); Assert.assertEquals(container1.getPipeline(), client1.getPipeline()); @@ -128,14 +125,13 @@ public void testFreeByReference() throws IOException { clientManager.getType(), HddsProtos.ReplicationFactor.ONE, containerOwner); XceiverClientSpi client2 = clientManager - .acquireClient(container2.getPipeline(), - container2.getContainerInfo().getContainerID()); + .acquireClient(container2.getPipeline()); Assert.assertEquals(1, client2.getRefcount()); Assert.assertNotEquals(client1, client2); // least recent container (i.e containerName1) is evicted XceiverClientSpi nonExistent1 = cache - .getIfPresent(container1.getContainerInfo().getContainerID()); + .getIfPresent(container1.getContainerInfo().getPipelineID()); Assert.assertEquals(null, nonExistent1); // However container call should succeed because of refcount on the client. String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); @@ -164,7 +160,7 @@ public void testFreeByEviction() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache cache = + Cache cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -172,8 +168,7 @@ public void testFreeByEviction() throws IOException { clientManager.getType(), clientManager.getFactor(), containerOwner); XceiverClientSpi client1 = clientManager - .acquireClient(container1.getPipeline(), - container1.getContainerInfo().getContainerID()); + .acquireClient(container1.getPipeline()); Assert.assertEquals(1, client1.getRefcount()); clientManager.releaseClient(client1); @@ -183,14 +178,13 @@ public void testFreeByEviction() throws IOException { .allocateContainer(clientManager.getType(), clientManager.getFactor(), containerOwner); XceiverClientSpi client2 = clientManager - .acquireClient(container2.getPipeline(), - container2.getContainerInfo().getContainerID()); + .acquireClient(container2.getPipeline()); Assert.assertEquals(1, client2.getRefcount()); Assert.assertNotEquals(client1, client2); // now client 1 should be evicted XceiverClientSpi nonExistent = cache - .getIfPresent(container1.getContainerInfo().getContainerID()); + .getIfPresent(container1.getContainerInfo().getPipelineID()); Assert.assertEquals(null, nonExistent); // Any container operation should now fail