From 29a1ba1e8fb11432404cea49e46eef47c36bb70a Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Thu, 24 Dec 2009 00:47:20 +0000 Subject: [PATCH] HADOOP-6460. Reinitializes buffers used for serializing responses in ipc server on exceeding maximum response size to free up Java heap. Contributed by Suresh Srinivas. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@893666 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 ++++ src/java/org/apache/hadoop/ipc/Server.java | 17 +++++++++++++++-- .../hadoop/ipc/TestIPCServerResponder.java | 6 ++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 36e2ab2f32..c5cca75cfc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1267,6 +1267,10 @@ Release 0.20.2 - Unreleased HADOOP-6269. Fix threading issue with defaultResource in Configuration. (Sreekanth Ramakrishnan via cdouglas) + HADOOP-6460. Reinitializes buffers used for serializing responses in ipc + server on exceeding maximum response size to free up Java heap. (suresh) + + Release 0.20.1 - 2009-09-01 INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index 5bd0e8363d..02e78b55ce 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -91,6 +91,12 @@ public abstract class Server { */ private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; + /** + * Initial and max size of response buffer + */ + static int INITIAL_RESP_BUF_SIZE = 10240; + static int MAX_RESP_BUF_SIZE = 1024*1024; + public static final Log LOG = LogFactory.getLog(Server.class); private static final ThreadLocal SERVER = new ThreadLocal(); @@ -944,7 +950,8 @@ public Handler(int instanceNumber) { public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); - ByteArrayOutputStream buf = new ByteArrayOutputStream(10240); + ByteArrayOutputStream buf = + new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { try { final Call call = callQueue.take(); // pop the queue; maybe blocked here @@ -985,10 +992,16 @@ public Writable run() throws Exception { error = StringUtils.stringifyException(e); } CurCall.set(null); - setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); + // Discard the large buf and reset it back to + // smaller size to freeup heap + if (buf.size() > MAX_RESP_BUF_SIZE) { + LOG.warn("Large response size " + buf.size() + " for call " + + call.toString()); + buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); + } responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it diff --git a/src/test/core/org/apache/hadoop/ipc/TestIPCServerResponder.java b/src/test/core/org/apache/hadoop/ipc/TestIPCServerResponder.java index 2591da0143..e1370fe866 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestIPCServerResponder.java +++ b/src/test/core/org/apache/hadoop/ipc/TestIPCServerResponder.java @@ -114,6 +114,12 @@ public void run() { } } + public void testResponseBuffer() throws Exception { + Server.INITIAL_RESP_BUF_SIZE = 1; + Server.MAX_RESP_BUF_SIZE = 1; + testServerResponder(1, true, 1, 1, 5); + } + public void testServerResponder() throws Exception { testServerResponder(10, true, 1, 10, 200); }