diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index ec7c3967af..bded4b9951 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -405,4 +405,9 @@
+
+
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
new file mode 100644
index 0000000000..9b4cbcf0e5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.ipc.Server.Call;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public abstract class ExternalCall extends Call {
+ private final PrivilegedExceptionAction action;
+ private final AtomicBoolean done = new AtomicBoolean();
+ private T result;
+ private Throwable error;
+
+ public ExternalCall(PrivilegedExceptionAction action) {
+ this.action = action;
+ }
+
+ public abstract UserGroupInformation getRemoteUser();
+
+ public final T get() throws IOException, InterruptedException {
+ waitForCompletion();
+ if (error != null) {
+ if (error instanceof IOException) {
+ throw (IOException)error;
+ } else {
+ throw new IOException(error);
+ }
+ }
+ return result;
+ }
+
+ // wait for response to be triggered to support postponed calls
+ private void waitForCompletion() throws InterruptedException {
+ synchronized(done) {
+ while (!done.get()) {
+ try {
+ done.wait();
+ } catch (InterruptedException ie) {
+ if (Thread.interrupted()) {
+ throw ie;
+ }
+ }
+ }
+ }
+ }
+
+ boolean isDone() {
+ return done.get();
+ }
+
+ // invoked by ipc handler
+ @Override
+ public final Void run() throws IOException {
+ try {
+ result = action.run();
+ sendResponse();
+ } catch (Throwable t) {
+ abortResponse(t);
+ }
+ return null;
+ }
+
+ @Override
+ final void doResponse(Throwable t) {
+ synchronized(done) {
+ error = t;
+ done.set(true);
+ done.notify();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f509d71517..1c7e76a506 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -384,6 +384,11 @@ public static UserGroupInformation getRemoteUser() {
return (call != null) ? call.getRemoteUser() : null;
}
+ public static String getProtocol() {
+ Call call = CurCall.get();
+ return (call != null) ? call.getProtocol() : null;
+ }
+
/** Return true if the invocation was through an RPC.
*/
public static boolean isRpcInvocation() {
@@ -672,6 +677,11 @@ public static class Call implements Schedulable,
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
+ Call() {
+ this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
+ RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
+ }
+
Call(Call call) {
this(call.callId, call.retryCount, call.rpcKind, call.clientId,
call.traceScope, call.callerContext);
@@ -703,6 +713,7 @@ public String toString() {
return "Call#" + callId + " Retry#" + retryCount;
}
+ @Override
public Void run() throws Exception {
return null;
}
@@ -718,6 +729,10 @@ public String getHostAddress() {
return (addr != null) ? addr.getHostAddress() : null;
}
+ public String getProtocol() {
+ return null;
+ }
+
/**
* Allow a IPC response to be postponed instead of sent immediately
* after the handler returns from the proxy method. The intended use
@@ -799,6 +814,11 @@ private class RpcCall extends Call {
this.rpcRequest = param;
}
+ @Override
+ public String getProtocol() {
+ return "rpc";
+ }
+
@Override
public UserGroupInformation getRemoteUser() {
return connection.user;
@@ -2333,33 +2353,15 @@ private void processRpcRequest(RpcRequestHeaderProto header,
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
- if (callQueue.isClientBackoffEnabled()) {
- // if RPC queue is full, we will ask the RPC client to back off by
- // throwing RetriableException. Whether RPC client will honor
- // RetriableException and retry depends on client ipc retry policy.
- // For example, FailoverOnNetworkExceptionRetry handles
- // RetriableException.
- queueRequestOrAskClientToBackOff(call);
- } else {
- callQueue.put(call); // queue the call; maybe blocked here
+ try {
+ queueCall(call);
+ } catch (IOException ioe) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
}
incRpcCount(); // Increment the rpc count
}
- private void queueRequestOrAskClientToBackOff(Call call)
- throws WrappedRpcServerException, InterruptedException {
- // If rpc scheduler indicates back off based on performance
- // degradation such as response time or rpc queue is full,
- // we will ask the client to back off.
- if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
- rpcMetrics.incrClientBackoff();
- RetriableException retriableException =
- new RetriableException("Server is too busy.");
- throw new WrappedRpcServerExceptionSuppressed(
- RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
- }
- }
-
/**
* Establish RPC connection setup by negotiating SASL if required, then
* reading and authorizing the connection header
@@ -2487,6 +2489,21 @@ private synchronized void close() {
}
}
+ public void queueCall(Call call) throws IOException, InterruptedException {
+ if (!callQueue.isClientBackoffEnabled()) {
+ callQueue.put(call); // queue the call; maybe blocked here
+ } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
+ // If rpc scheduler indicates back off based on performance degradation
+ // such as response time or rpc queue is full, we will ask the client
+ // to back off by throwing RetriableException. Whether the client will
+ // honor RetriableException and retry depends the client and its policy.
+ // For example, IPC clients using FailoverOnNetworkExceptionRetry handle
+ // RetriableException.
+ rpcMetrics.incrClientBackoff();
+ throw new RetriableException("Server is too busy.");
+ }
+ }
+
/** Handles queued calls . */
private class Handler extends Thread {
public Handler(int instanceNumber) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index ff6b25e447..92d91839a1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -64,6 +64,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -926,6 +927,90 @@ public void testConnectionPing() throws Exception {
}
}
+ @Test(timeout=30000)
+ public void testExternalCall() throws Exception {
+ final UserGroupInformation ugi = UserGroupInformation
+ .createUserForTesting("user123", new String[0]);
+ final IOException expectedIOE = new IOException("boom");
+
+ // use 1 handler so the callq can be plugged
+ final Server server = setupTestServer(conf, 1);
+ try {
+ final AtomicBoolean result = new AtomicBoolean();
+
+ ExternalCall remoteUserCall = newExtCall(ugi,
+ new PrivilegedExceptionAction() {
+ @Override
+ public String run() throws Exception {
+ return UserGroupInformation.getCurrentUser().getUserName();
+ }
+ });
+
+ ExternalCall exceptionCall = newExtCall(ugi,
+ new PrivilegedExceptionAction() {
+ @Override
+ public String run() throws Exception {
+ throw expectedIOE;
+ }
+ });
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ ExternalCall barrierCall = newExtCall(ugi,
+ new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws Exception {
+ // notify we are in a handler and then wait to keep the callq
+ // plugged up
+ latch.countDown();
+ barrier.await();
+ return null;
+ }
+ });
+
+ server.queueCall(barrierCall);
+ server.queueCall(exceptionCall);
+ server.queueCall(remoteUserCall);
+
+ // wait for barrier call to enter the handler, check that the other 2
+ // calls are actually queued
+ latch.await();
+ assertEquals(2, server.getCallQueueLen());
+
+ // unplug the callq
+ barrier.await();
+ barrierCall.get();
+
+ // verify correct ugi is used
+ String answer = remoteUserCall.get();
+ assertEquals(ugi.getUserName(), answer);
+
+ try {
+ exceptionCall.get();
+ fail("didn't throw");
+ } catch (IOException ioe) {
+ assertEquals(expectedIOE.getMessage(), ioe.getMessage());
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+ private ExternalCall newExtCall(UserGroupInformation ugi,
+ PrivilegedExceptionAction callable) {
+ return new ExternalCall(callable) {
+ @Override
+ public String getProtocol() {
+ return "test";
+ }
+ @Override
+ public UserGroupInformation getRemoteUser() {
+ return ugi;
+ }
+ };
+ }
+
@Test
public void testRpcMetrics() throws Exception {
Server server;