From d099c1c78a9782378397e32f1ecd4330ade8c5b6 Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Fri, 19 Feb 2010 09:04:47 +0000 Subject: [PATCH] HADOOP-6572. Makes sure that SASL encryption and push to responder queue for the RPC response happens atomically. Contributed by Kan Zhang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@911748 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 +++ src/java/org/apache/hadoop/ipc/Server.java | 25 +++++++++++++--------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0f8acacacd..a5941588d2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -217,6 +217,9 @@ Trunk (unreleased changes) HADOOP-6558. Return null in HarFileSystem.getFileChecksum(..) since no checksum algorithm is implemented. (szetszwo) + HADOOP-6572. Makes sure that SASL encryption and push to responder + queue for the RPC response happens atomically. (Kan Zhang via ddas) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index cbbfc6c256..b338b00283 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -1198,17 +1198,22 @@ public Writable run() throws Exception { error = StringUtils.stringifyException(e); } CurCall.set(null); - setupResponse(buf, call, - (error == null) ? Status.SUCCESS : Status.ERROR, - value, errorClass, error); - // Discard the large buf and reset it back to - // smaller size to freeup heap - if (buf.size() > MAX_RESP_BUF_SIZE) { - LOG.warn("Large response size " + buf.size() + " for call " + - call.toString()); - buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); + synchronized (call.connection.responseQueue) { + // setupResponse() needs to be sync'ed together with + // responder.doResponse() since setupResponse may use + // SASL to encrypt response data and SASL enforces + // its own message ordering. + setupResponse(buf, call, (error == null) ? Status.SUCCESS + : Status.ERROR, value, errorClass, error); + // Discard the large buf and reset it back to + // smaller size to freeup heap + if (buf.size() > MAX_RESP_BUF_SIZE) { + LOG.warn("Large response size " + buf.size() + " for call " + + call.toString()); + buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); + } + responder.doRespond(call); } - responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " +