HDDS-555. RandomKeyGenerator runs not closing the XceiverClient properly. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-10-12 10:36:30 -07:00
parent f63ee083db
commit e12edb3d8f
7 changed files with 37 additions and 51 deletions

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -58,7 +59,7 @@ public class XceiverClientManager implements Closeable {
//TODO : change this to SCM configuration class //TODO : change this to SCM configuration class
private final Configuration conf; private final Configuration conf;
private final Cache<Long, XceiverClientSpi> clientCache; private final Cache<PipelineID, XceiverClientSpi> clientCache;
private final boolean useRatis; private final boolean useRatis;
private static XceiverClientMetrics metrics; private static XceiverClientMetrics metrics;
@ -82,10 +83,10 @@ public XceiverClientManager(Configuration conf) {
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize) .maximumSize(maxSize)
.removalListener( .removalListener(
new RemovalListener<Long, XceiverClientSpi>() { new RemovalListener<PipelineID, XceiverClientSpi>() {
@Override @Override
public void onRemoval( public void onRemoval(
RemovalNotification<Long, XceiverClientSpi> RemovalNotification<PipelineID, XceiverClientSpi>
removalNotification) { removalNotification) {
synchronized (clientCache) { synchronized (clientCache) {
// Mark the entry as evicted // Mark the entry as evicted
@ -97,7 +98,7 @@ public void onRemoval(
} }
@VisibleForTesting @VisibleForTesting
public Cache<Long, XceiverClientSpi> getClientCache() { public Cache<PipelineID, XceiverClientSpi> getClientCache() {
return clientCache; return clientCache;
} }
@ -112,14 +113,14 @@ public Cache<Long, XceiverClientSpi> getClientCache() {
* @return XceiverClientSpi connected to a container * @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired * @throws IOException if a XceiverClientSpi cannot be acquired
*/ */
public XceiverClientSpi acquireClient(Pipeline pipeline, long containerID) public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException { throws IOException {
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
synchronized (clientCache) { synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline, containerID); XceiverClientSpi info = getClient(pipeline);
info.incrementReference(); info.incrementReference();
return info; return info;
} }
@ -137,10 +138,10 @@ public void releaseClient(XceiverClientSpi client) {
} }
} }
private XceiverClientSpi getClient(Pipeline pipeline, long containerID) private XceiverClientSpi getClient(Pipeline pipeline)
throws IOException { throws IOException {
try { try {
return clientCache.get(containerID, return clientCache.get(pipeline.getId(),
new Callable<XceiverClientSpi>() { new Callable<XceiverClientSpi>() {
@Override @Override
public XceiverClientSpi call() throws Exception { public XceiverClientSpi call() throws Exception {

View File

@ -96,8 +96,7 @@ public ContainerWithPipeline createContainer(String owner)
xceiverClientManager.getType(), xceiverClientManager.getType(),
xceiverClientManager.getFactor(), owner); xceiverClientManager.getFactor(), owner);
Pipeline pipeline = containerWithPipeline.getPipeline(); Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline, client = xceiverClientManager.acquireClient(pipeline);
containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its // Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines // namespace. The client needs to create the pipeline on the machines
@ -207,8 +206,7 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
storageContainerLocationClient.allocateContainer(type, factor, storageContainerLocationClient.allocateContainer(type, factor,
owner); owner);
Pipeline pipeline = containerWithPipeline.getPipeline(); Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline, client = xceiverClientManager.acquireClient(pipeline);
containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its // Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines // namespace. The client needs to create the pipeline on the machines
@ -217,8 +215,7 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
createPipeline(client, pipeline); createPipeline(client, pipeline);
} }
// connect to pipeline leader and allocate container on leader datanode. // connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline, client = xceiverClientManager.acquireClient(pipeline);
containerWithPipeline.getContainerInfo().getContainerID());
createContainer(client, createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID()); containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline; return containerWithPipeline;
@ -279,7 +276,7 @@ public void deleteContainer(long containerId, Pipeline pipeline,
boolean force) throws IOException { boolean force) throws IOException {
XceiverClientSpi client = null; XceiverClientSpi client = null;
try { try {
client = xceiverClientManager.acquireClient(pipeline, containerId); client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString(); String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls ContainerProtocolCalls
.deleteContainer(client, containerId, force, traceID); .deleteContainer(client, containerId, force, traceID);
@ -334,7 +331,7 @@ public ContainerData readContainer(long containerID,
Pipeline pipeline) throws IOException { Pipeline pipeline) throws IOException {
XceiverClientSpi client = null; XceiverClientSpi client = null;
try { try {
client = xceiverClientManager.acquireClient(pipeline, containerID); client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString(); String traceID = UUID.randomUUID().toString();
ReadContainerResponseProto response = ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client, containerID, traceID); ContainerProtocolCalls.readContainer(client, containerID, traceID);
@ -421,7 +418,7 @@ public void closeContainer(long containerId, Pipeline pipeline)
For now, take the #2 way. For now, take the #2 way.
*/ */
// Actually close the container on Datanode // Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline, containerId); client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString(); String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectStageChange( storageContainerLocationClient.notifyObjectStageChange(

View File

@ -273,7 +273,7 @@ public static LengthInputStream getFromOmKeyInfo(
ContainerWithPipeline containerWithPipeline = ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.getContainerWithPipeline(containerID); storageContainerLocationClient.getContainerWithPipeline(containerID);
XceiverClientSpi xceiverClient = xceiverClientManager XceiverClientSpi xceiverClient = xceiverClientManager
.acquireClient(containerWithPipeline.getPipeline(), containerID); .acquireClient(containerWithPipeline.getPipeline());
boolean success = false; boolean success = false;
containerKey = omKeyLocationInfo.getLocalID(); containerKey = omKeyLocationInfo.getLocalID();
try { try {

View File

@ -189,8 +189,7 @@ private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
ContainerInfo container = containerWithPipeline.getContainerInfo(); ContainerInfo container = containerWithPipeline.getContainerInfo();
XceiverClientSpi xceiverClient = XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(), xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
container.getContainerID());
// create container if needed // create container if needed
if (subKeyInfo.getShouldCreateContainer()) { if (subKeyInfo.getShouldCreateContainer()) {
try { try {

View File

@ -86,8 +86,7 @@ public void testAllocateWrite() throws Exception {
xceiverClientManager.getType(), xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(), .acquireClient(container.getPipeline());
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client, ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID); container.getContainerInfo().getContainerID(), traceID);
@ -110,8 +109,7 @@ public void testInvalidBlockRead() throws Exception {
xceiverClientManager.getType(), xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(), .acquireClient(container.getPipeline());
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client, ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID); container.getContainerInfo().getContainerID(), traceID);
@ -135,8 +133,7 @@ public void testInvalidContainerRead() throws Exception {
xceiverClientManager.getType(), xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(), .acquireClient(container.getPipeline());
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client, ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID); container.getContainerInfo().getContainerID(), traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID( BlockID blockID = ContainerTestHelper.getTestBlockID(

View File

@ -91,8 +91,7 @@ public void tesGetCommittedBlockLength() throws Exception {
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID(); long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline(); Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
xceiverClientManager.acquireClient(pipeline, containerID);
//create the container //create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID); ContainerProtocolCalls.createContainer(client, containerID, traceID);
@ -128,7 +127,7 @@ public void tesGetCommittedBlockLengthWithClosedContainer()
long containerID = container.getContainerInfo().getContainerID(); long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline(); Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = XceiverClientSpi client =
xceiverClientManager.acquireClient(pipeline, containerID); xceiverClientManager.acquireClient(pipeline);
// create the container // create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID); ContainerProtocolCalls.createContainer(client, containerID, traceID);
@ -162,7 +161,7 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception {
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID(); long containerID = container.getContainerInfo().getContainerID();
XceiverClientSpi client = xceiverClientManager XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(), containerID); .acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client, containerID, traceID); ContainerProtocolCalls.createContainer(client, containerID, traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
@ -187,7 +186,7 @@ public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID(); long containerID = container.getContainerInfo().getContainerID();
XceiverClientSpi client = xceiverClientManager XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(), containerID); .acquireClient(container.getPipeline());
ContainerProtocolCalls ContainerProtocolCalls
.createContainer(client, containerID, traceID); .createContainer(client, containerID, traceID);
@ -223,8 +222,7 @@ public void tesPutKeyResposne() throws Exception {
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID(); long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline(); Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
xceiverClientManager.acquireClient(pipeline, containerID);
//create the container //create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID); ContainerProtocolCalls.createContainer(client, containerID, traceID);

View File

@ -20,6 +20,7 @@
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -81,21 +82,18 @@ public void testCaching() throws IOException {
.allocateContainer(clientManager.getType(), clientManager.getFactor(), .allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner); containerOwner);
XceiverClientSpi client1 = clientManager XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline(), .acquireClient(container1.getPipeline());
container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount()); Assert.assertEquals(1, client1.getRefcount());
ContainerWithPipeline container2 = storageContainerLocationClient ContainerWithPipeline container2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(), .allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner); containerOwner);
XceiverClientSpi client2 = clientManager XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline(), .acquireClient(container2.getPipeline());
container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount()); Assert.assertEquals(1, client2.getRefcount());
XceiverClientSpi client3 = clientManager XceiverClientSpi client3 = clientManager
.acquireClient(container1.getPipeline(), .acquireClient(container1.getPipeline());
container1.getContainerInfo().getContainerID());
Assert.assertEquals(2, client3.getRefcount()); Assert.assertEquals(2, client3.getRefcount());
Assert.assertEquals(2, client1.getRefcount()); Assert.assertEquals(2, client1.getRefcount());
Assert.assertEquals(client1, client3); Assert.assertEquals(client1, client3);
@ -109,7 +107,7 @@ public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache = Cache<PipelineID, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
ContainerWithPipeline container1 = ContainerWithPipeline container1 =
@ -117,8 +115,7 @@ public void testFreeByReference() throws IOException {
clientManager.getType(), HddsProtos.ReplicationFactor.ONE, clientManager.getType(), HddsProtos.ReplicationFactor.ONE,
containerOwner); containerOwner);
XceiverClientSpi client1 = clientManager XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline(), .acquireClient(container1.getPipeline());
container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount()); Assert.assertEquals(1, client1.getRefcount());
Assert.assertEquals(container1.getPipeline(), Assert.assertEquals(container1.getPipeline(),
client1.getPipeline()); client1.getPipeline());
@ -128,14 +125,13 @@ public void testFreeByReference() throws IOException {
clientManager.getType(), clientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner); HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client2 = clientManager XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline(), .acquireClient(container2.getPipeline());
container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount()); Assert.assertEquals(1, client2.getRefcount());
Assert.assertNotEquals(client1, client2); Assert.assertNotEquals(client1, client2);
// least recent container (i.e containerName1) is evicted // least recent container (i.e containerName1) is evicted
XceiverClientSpi nonExistent1 = cache XceiverClientSpi nonExistent1 = cache
.getIfPresent(container1.getContainerInfo().getContainerID()); .getIfPresent(container1.getContainerInfo().getPipelineID());
Assert.assertEquals(null, nonExistent1); Assert.assertEquals(null, nonExistent1);
// However container call should succeed because of refcount on the client. // However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
@ -164,7 +160,7 @@ public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache = Cache<PipelineID, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
ContainerWithPipeline container1 = ContainerWithPipeline container1 =
@ -172,8 +168,7 @@ public void testFreeByEviction() throws IOException {
clientManager.getType(), clientManager.getType(),
clientManager.getFactor(), containerOwner); clientManager.getFactor(), containerOwner);
XceiverClientSpi client1 = clientManager XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline(), .acquireClient(container1.getPipeline());
container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount()); Assert.assertEquals(1, client1.getRefcount());
clientManager.releaseClient(client1); clientManager.releaseClient(client1);
@ -183,14 +178,13 @@ public void testFreeByEviction() throws IOException {
.allocateContainer(clientManager.getType(), clientManager.getFactor(), .allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner); containerOwner);
XceiverClientSpi client2 = clientManager XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline(), .acquireClient(container2.getPipeline());
container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount()); Assert.assertEquals(1, client2.getRefcount());
Assert.assertNotEquals(client1, client2); Assert.assertNotEquals(client1, client2);
// now client 1 should be evicted // now client 1 should be evicted
XceiverClientSpi nonExistent = cache XceiverClientSpi nonExistent = cache
.getIfPresent(container1.getContainerInfo().getContainerID()); .getIfPresent(container1.getContainerInfo().getPipelineID());
Assert.assertEquals(null, nonExistent); Assert.assertEquals(null, nonExistent);
// Any container operation should now fail // Any container operation should now fail