HADOOP-6460. Reinitializes buffers used for serializing responses in ipc server on exceeding maximum response size to free up Java heap. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@893666 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2009-12-24 00:47:20 +00:00
parent d115e2cc3a
commit 29a1ba1e8f
3 changed files with 25 additions and 2 deletions

View File

@ -1267,6 +1267,10 @@ Release 0.20.2 - Unreleased
HADOOP-6269. Fix threading issue with defaultResource in Configuration. HADOOP-6269. Fix threading issue with defaultResource in Configuration.
(Sreekanth Ramakrishnan via cdouglas) (Sreekanth Ramakrishnan via cdouglas)
HADOOP-6460. Reinitializes buffers used for serializing responses in ipc
server on exceeding maximum response size to free up Java heap. (suresh)
Release 0.20.1 - 2009-09-01 Release 0.20.1 - 2009-09-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -91,6 +91,12 @@ public abstract class Server {
*/ */
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
/**
* Initial and max size of response buffer
*/
static int INITIAL_RESP_BUF_SIZE = 10240;
static int MAX_RESP_BUF_SIZE = 1024*1024;
public static final Log LOG = LogFactory.getLog(Server.class); public static final Log LOG = LogFactory.getLog(Server.class);
private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>(); private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
@ -944,7 +950,8 @@ public Handler(int instanceNumber) {
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
SERVER.set(Server.this); SERVER.set(Server.this);
ByteArrayOutputStream buf = new ByteArrayOutputStream(10240); ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) { while (running) {
try { try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here final Call call = callQueue.take(); // pop the queue; maybe blocked here
@ -985,10 +992,16 @@ public Writable run() throws Exception {
error = StringUtils.stringifyException(e); error = StringUtils.stringifyException(e);
} }
CurCall.set(null); CurCall.set(null);
setupResponse(buf, call, setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR, (error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error); value, errorClass, error);
// Discard the large buf and reset it back to
// smaller size to freeup heap
if (buf.size() > MAX_RESP_BUF_SIZE) {
LOG.warn("Large response size " + buf.size() + " for call " +
call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call); responder.doRespond(call);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (running) { // unexpected -- log it if (running) { // unexpected -- log it

View File

@ -114,6 +114,12 @@ public void run() {
} }
} }
public void testResponseBuffer() throws Exception {
Server.INITIAL_RESP_BUF_SIZE = 1;
Server.MAX_RESP_BUF_SIZE = 1;
testServerResponder(1, true, 1, 1, 5);
}
public void testServerResponder() throws Exception { public void testServerResponder() throws Exception {
testServerResponder(10, true, 1, 10, 200); testServerResponder(10, true, 1, 10, 200);
} }