HDFS-11036. Ozone: reuse Xceiver connection. Contributed by Chen Liang.

This commit is contained in:
Anu Engineer 2017-02-28 12:15:26 -08:00 committed by Owen O'Malley
parent 754ed7d651
commit b8b72241aa
2 changed files with 106 additions and 11 deletions

View File

@ -30,6 +30,11 @@ public final class ScmConfigKeys {
"dfs.container.ipc";
public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY =
"scm.container.client.idle.threshold";
public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
10000;
// TODO : this is copied from OzoneConsts, may need to move to a better place
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
}

View File

@ -19,27 +19,39 @@
package org.apache.hadoop.scm;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
* instances. Callers use this class to acquire an XceiverClient instance
* connected to the desired container pipeline. When done, the caller also uses
* this class to release the previously acquired XceiverClient instance.
*
* This class may evolve to implement efficient lifecycle management policies by
* caching container location information and pooling connected client instances
* for reuse without needing to reestablish a socket connection. The current
* implementation simply allocates and closes a new instance every time.
*
* This class caches connection to container for reuse purpose, such that
* accessing same container frequently will be through the same connection
* without reestablishing connection. But the connection will be closed if
* not being used for a period of time.
*/
public class XceiverClientManager {
//TODO : change this to SCM configuration class
private final Configuration conf;
private Cache<String, XceiverClientWithAccessInfo> openClient;
private final long staleThresholdMs;
/**
* Creates a new XceiverClientManager.
@ -48,13 +60,38 @@ public class XceiverClientManager {
*/
public XceiverClientManager(Configuration conf) {
Preconditions.checkNotNull(conf);
this.staleThresholdMs = conf.getTimeDuration(
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
this.conf = conf;
this.openClient = CacheBuilder.newBuilder()
.expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
.removalListener(
new RemovalListener<String, XceiverClientWithAccessInfo>() {
@Override
public void onRemoval(
RemovalNotification<String, XceiverClientWithAccessInfo>
removalNotification) {
// If the reference count is not 0, this xceiver client should not
// be evicted, add it back to the cache.
XceiverClientWithAccessInfo info = removalNotification.getValue();
if (info.hasRefence()) {
synchronized (XceiverClientManager.this.openClient) {
XceiverClientManager.this
.openClient.put(removalNotification.getKey(), info);
}
}
}
}).build();
}
/**
* Acquires a XceiverClient connected to a container capable of storing the
* specified key.
*
* If there is already a cached XceiverClient, simply return the cached
* otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired
@ -63,13 +100,28 @@ public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {
throw new IOException("Exception connecting XceiverClient.", e);
String containerName = pipeline.getContainerName();
XceiverClientWithAccessInfo info = openClient.getIfPresent(containerName);
if (info != null) {
// we do have this connection, add reference and return
info.incrementReference();
return info.getXceiverClient();
} else {
// connection not found, create new, add reference and return
XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {
throw new IOException("Exception connecting XceiverClient.", e);
}
info = new XceiverClientWithAccessInfo(xceiverClient);
info.incrementReference();
synchronized (openClient) {
openClient.put(containerName, info);
}
return xceiverClient;
}
return xceiverClient;
}
/**
@ -79,6 +131,44 @@ public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
*/
public void releaseClient(XceiverClient xceiverClient) {
Preconditions.checkNotNull(xceiverClient);
xceiverClient.close();
String containerName = xceiverClient.getPipeline().getContainerName();
XceiverClientWithAccessInfo info;
synchronized (openClient) {
info = openClient.getIfPresent(containerName);
}
Preconditions.checkNotNull(info);
info.decrementReference();
}
/**
* A helper class for caching and cleaning XceiverClient. Three parameters:
* - the actual XceiverClient object
* - a time stamp representing the most recent access (acquire or release)
* - a reference count, +1 when acquire, -1 when release
*/
private static class XceiverClientWithAccessInfo {
final private XceiverClient xceiverClient;
final private AtomicInteger referenceCount;
XceiverClientWithAccessInfo(XceiverClient xceiverClient) {
this.xceiverClient = xceiverClient;
this.referenceCount = new AtomicInteger(0);
}
void incrementReference() {
this.referenceCount.incrementAndGet();
}
void decrementReference() {
this.referenceCount.decrementAndGet();
}
boolean hasRefence() {
return this.referenceCount.get() != 0;
}
XceiverClient getXceiverClient() {
return xceiverClient;
}
}
}