HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop (Tsuyoshi OZAWA via Colin Patrick McCabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1502696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0880be20ac
commit
a038ec6ceb
@ -305,6 +305,9 @@ Release 2.2.0 - UNRELEASED
|
|||||||
HADOOP-9417. Support for symlink resolution in LocalFileSystem /
|
HADOOP-9417. Support for symlink resolution in LocalFileSystem /
|
||||||
RawLocalFileSystem. (Andrew Wang via Colin Patrick McCabe)
|
RawLocalFileSystem. (Andrew Wang via Colin Patrick McCabe)
|
||||||
|
|
||||||
|
HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop.
|
||||||
|
(Tsuyoshi OZAWA vi Colin Patrick McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -118,17 +118,70 @@ public class Client {
|
|||||||
private final byte[] clientId;
|
private final byte[] clientId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor on which IPC calls' parameters are sent. Deferring
|
* Executor on which IPC calls' parameters are sent.
|
||||||
* the sending of parameters to a separate thread isolates them
|
* Deferring the sending of parameters to a separate
|
||||||
* from thread interruptions in the calling code.
|
* thread isolates them from thread interruptions in the
|
||||||
|
* calling code.
|
||||||
*/
|
*/
|
||||||
private static final ExecutorService SEND_PARAMS_EXECUTOR =
|
private final ExecutorService sendParamsExecutor;
|
||||||
Executors.newCachedThreadPool(
|
private final static ClientExecutorServiceFactory clientExcecutorFactory =
|
||||||
new ThreadFactoryBuilder()
|
new ClientExecutorServiceFactory();
|
||||||
.setDaemon(true)
|
|
||||||
.setNameFormat("IPC Parameter Sending Thread #%d")
|
|
||||||
.build());
|
|
||||||
|
|
||||||
|
private static class ClientExecutorServiceFactory {
|
||||||
|
private int executorRefCount = 0;
|
||||||
|
private ExecutorService clientExecutor = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Executor on which IPC calls' parameters are sent.
|
||||||
|
* If the internal reference counter is zero, this method
|
||||||
|
* creates the instance of Executor. If not, this method
|
||||||
|
* just returns the reference of clientExecutor.
|
||||||
|
*
|
||||||
|
* @return An ExecutorService instance
|
||||||
|
*/
|
||||||
|
synchronized ExecutorService refAndGetInstance() {
|
||||||
|
if (executorRefCount == 0) {
|
||||||
|
clientExecutor = Executors.newCachedThreadPool(
|
||||||
|
new ThreadFactoryBuilder()
|
||||||
|
.setDaemon(true)
|
||||||
|
.setNameFormat("IPC Parameter Sending Thread #%d")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
executorRefCount++;
|
||||||
|
|
||||||
|
return clientExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup Executor on which IPC calls' parameters are sent.
|
||||||
|
* If reference counter is zero, this method discards the
|
||||||
|
* instance of the Executor. If not, this method
|
||||||
|
* just decrements the internal reference counter.
|
||||||
|
*
|
||||||
|
* @return An ExecutorService instance if it exists.
|
||||||
|
* Null is returned if not.
|
||||||
|
*/
|
||||||
|
synchronized ExecutorService unrefAndCleanup() {
|
||||||
|
executorRefCount--;
|
||||||
|
assert(executorRefCount >= 0);
|
||||||
|
|
||||||
|
if (executorRefCount == 0) {
|
||||||
|
clientExecutor.shutdown();
|
||||||
|
try {
|
||||||
|
if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
|
||||||
|
clientExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Interrupted while waiting for clientExecutor" +
|
||||||
|
"to stop", e);
|
||||||
|
clientExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
clientExecutor = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return clientExecutor;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* set the ping interval value in configuration
|
* set the ping interval value in configuration
|
||||||
@ -201,7 +254,7 @@ synchronized void decCount() {
|
|||||||
synchronized boolean isZeroReference() {
|
synchronized boolean isZeroReference() {
|
||||||
return refCount==0;
|
return refCount==0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that represents an RPC call
|
* Class that represents an RPC call
|
||||||
*/
|
*/
|
||||||
@ -879,7 +932,8 @@ public void sendRpcRequest(final Call call)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Serialize the call to be sent. This is done from the actual
|
// Serialize the call to be sent. This is done from the actual
|
||||||
// caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
|
// caller thread, rather than the sendParamsExecutor thread,
|
||||||
|
|
||||||
// so that if the serialization throws an error, it is reported
|
// so that if the serialization throws an error, it is reported
|
||||||
// properly. This also parallelizes the serialization.
|
// properly. This also parallelizes the serialization.
|
||||||
//
|
//
|
||||||
@ -896,7 +950,7 @@ public void sendRpcRequest(final Call call)
|
|||||||
call.rpcRequest.write(d);
|
call.rpcRequest.write(d);
|
||||||
|
|
||||||
synchronized (sendRpcRequestLock) {
|
synchronized (sendRpcRequestLock) {
|
||||||
Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
|
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
@ -1092,6 +1146,7 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
|
|||||||
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
||||||
this.clientId = StringUtils.getUuidBytes();
|
this.clientId = StringUtils.getUuidBytes();
|
||||||
|
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1136,6 +1191,8 @@ public void stop() {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clientExcecutorFactory.unrefAndCleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -572,6 +572,24 @@ private void callAndVerify(Server server, InetSocketAddress addr,
|
|||||||
assertFalse(noChanged ^ serviceClass == serviceClass2);
|
assertFalse(noChanged ^ serviceClass == serviceClass2);
|
||||||
client.stop();
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000, expected=IOException.class)
|
||||||
|
public void testIpcAfterStopping() throws IOException, InterruptedException {
|
||||||
|
// start server
|
||||||
|
Server server = new TestServer(5, false);
|
||||||
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
// start client
|
||||||
|
Client client = new Client(LongWritable.class, conf);
|
||||||
|
client.call(new LongWritable(RANDOM.nextLong()),
|
||||||
|
addr, null, null, MIN_SLEEP_TIME, 0, conf);
|
||||||
|
client.stop();
|
||||||
|
|
||||||
|
// This call should throw IOException.
|
||||||
|
client.call(new LongWritable(RANDOM.nextLong()),
|
||||||
|
addr, null, null, MIN_SLEEP_TIME, 0, conf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that file descriptors aren't leaked by starting
|
* Check that file descriptors aren't leaked by starting
|
||||||
|
Loading…
Reference in New Issue
Block a user