From 574dcd34c0da1903d25e37dc5757642a584dc3d0 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 6 Jun 2016 16:31:23 +0800 Subject: [PATCH] Revert "Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."" This reverts commit e4450d47f19131818e1c040b6bd8d85ae8250475. --- .../java/org/apache/hadoop/ipc/Client.java | 119 ++++++++-------- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 62 ++++----- .../hadoop/util/concurrent/AsyncGet.java | 60 ++++++++ .../util/concurrent/AsyncGetFuture.java | 73 ++++++++++ .../org/apache/hadoop/ipc/TestAsyncIPC.java | 130 ++++++++++-------- .../hdfs/AsyncDistributedFileSystem.java | 24 +--- .../ClientNamenodeProtocolTranslatorPB.java | 33 +++-- 7 files changed, 313 insertions(+), 188 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 9be4649378..d1d5b17c6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -18,46 +18,10 @@ package org.apache.hadoop.ipc; -import static org.apache.hadoop.ipc.RpcConstants.*; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.security.PrivilegedExceptionAction; -import java.util.Arrays; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.net.SocketFactory; -import javax.security.sasl.Sasl; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.CodedOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -93,14 +57,25 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.htrace.core.Span; import org.apache.htrace.core.Tracer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.CodedOutputStream; +import javax.net.SocketFactory; +import javax.security.sasl.Sasl; +import java.io.*; +import java.net.*; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -119,8 +94,8 @@ public class Client implements AutoCloseable { private static final ThreadLocal callId = new ThreadLocal(); private static final ThreadLocal retryCount = new ThreadLocal(); - private static final ThreadLocal> - RETURN_RPC_RESPONSE = new ThreadLocal<>(); + private static final ThreadLocal> ASYNC_RPC_RESPONSE + = new ThreadLocal<>(); private static final ThreadLocal asynchronousMode = new ThreadLocal() { @Override @@ -131,8 +106,8 @@ protected Boolean initialValue() { @SuppressWarnings("unchecked") @Unstable - public static Future getReturnRpcResponse() { - return (Future) RETURN_RPC_RESPONSE.get(); + public static Future getAsyncRpcResponse() { + return (Future) ASYNC_RPC_RESPONSE.get(); } /** Set call id and retry count for the next call. */ @@ -379,6 +354,11 @@ private Call(RPC.RpcKind rpcKind, Writable param) { } } + @Override + public String toString() { + return getClass().getSimpleName() + id; + } + /** Indicate when the call is complete and the * value or error are available. Notifies by default. */ protected synchronized void callComplete() { @@ -1413,27 +1393,32 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, } if (isAsynchronousMode()) { - Future returnFuture = new AbstractFuture() { - private final AtomicBoolean callled = new AtomicBoolean(false); + final AsyncGet asyncGet + = new AsyncGet() { @Override - public Writable get() throws InterruptedException, ExecutionException { - if (callled.compareAndSet(false, true)) { - try { - set(getRpcResponse(call, connection)); - } catch (IOException ie) { - setException(ie); - } finally { + public Writable get(long timeout, TimeUnit unit) + throws IOException, TimeoutException{ + boolean done = true; + try { + final Writable w = getRpcResponse(call, connection, timeout, unit); + if (w == null) { + done = false; + throw new TimeoutException(call + " timed out " + + timeout + " " + unit); + } + return w; + } finally { + if (done) { releaseAsyncCall(); } } - return super.get(); } }; - RETURN_RPC_RESPONSE.set(returnFuture); + ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet)); return null; } else { - return getRpcResponse(call, connection); + return getRpcResponse(call, connection, -1, null); } } @@ -1469,12 +1454,18 @@ int getAsyncCallCount() { return asyncCallCounter.get(); } - private Writable getRpcResponse(final Call call, final Connection connection) - throws IOException { + /** @return the rpc response or, in case of timeout, null. */ + private Writable getRpcResponse(final Call call, final Connection connection, + final long timeout, final TimeUnit unit) throws IOException { synchronized (call) { while (!call.done) { try { - call.wait(); // wait for the result + final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout( + timeout, unit); + call.wait(waitTimeout); // wait for the result + if (waitTimeout > 0 && !call.done) { + return null; + } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new InterruptedIOException("Call interrupted"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 8fcdb78bac..0f43fc6d3d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -18,21 +18,9 @@ package org.apache.hadoop.ipc; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.SocketFactory; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.*; +import com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -52,17 +40,23 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.AsyncGet; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingService; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.GeneratedMessage; -import com.google.protobuf.Message; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; +import javax.net.SocketFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * RPC Engine for for protobuf based RPCs. @@ -70,8 +64,8 @@ @InterfaceStability.Evolving public class ProtobufRpcEngine implements RpcEngine { public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class); - private static final ThreadLocal> - RETURN_MESSAGE_CALLBACK = new ThreadLocal<>(); + private static final ThreadLocal> + ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( @@ -81,10 +75,9 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ClientCache CLIENTS = new ClientCache(); - @SuppressWarnings("unchecked") @Unstable - public static Callable getReturnMessageCallback() { - return (Callable) RETURN_MESSAGE_CALLBACK.get(); + public static AsyncGet getAsyncReturnMessage() { + return ASYNC_RETURN_MESSAGE.get(); } public ProtocolProxy getProxy(Class protocol, long clientVersion, @@ -263,14 +256,17 @@ public Object invoke(Object proxy, final Method method, Object[] args) } if (Client.isAsynchronousMode()) { - final Future frrw = Client.getReturnRpcResponse(); - Callable callback = new Callable() { + final Future frrw = Client.getAsyncRpcResponse(); + final AsyncGet asyncGet + = new AsyncGet() { @Override - public Message call() throws Exception { - return getReturnMessage(method, frrw.get()); + public Message get(long timeout, TimeUnit unit) throws Exception { + final RpcResponseWrapper rrw = timeout < 0? + frrw.get(): frrw.get(timeout, unit); + return getReturnMessage(method, rrw); } }; - RETURN_MESSAGE_CALLBACK.set(callback); + ASYNC_RETURN_MESSAGE.set(asyncGet); return null; } else { return getReturnMessage(method, val); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java new file mode 100644 index 0000000000..5eac869632 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util.concurrent; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This interface defines an asynchronous {@link #get(long, TimeUnit)} method. + * + * When the return value is still being computed, invoking + * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}. + * The method should be invoked again and again + * until the underlying computation is completed. + * + * @param The type of the return value. + * @param The exception type that the underlying implementation may throw. + */ +public interface AsyncGet { + /** + * Get the result. + * + * @param timeout The maximum time period to wait. + * When timeout == 0, it does not wait at all. + * When timeout < 0, it waits indefinitely. + * @param unit The unit of the timeout value + * @return the result, which is possibly null. + * @throws E an exception thrown by the underlying implementation. + * @throws TimeoutException if it cannot return after the given time period. + * @throws InterruptedException if the thread is interrupted. + */ + R get(long timeout, TimeUnit unit) + throws E, TimeoutException, InterruptedException; + + /** Utility */ + class Util { + /** + * @return {@link Object#wait(long)} timeout converted + * from {@link #get(long, TimeUnit)} timeout. + */ + public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){ + return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java new file mode 100644 index 0000000000..d6878670ff --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util.concurrent; + +import com.google.common.util.concurrent.AbstractFuture; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** A {@link Future} implemented using an {@link AsyncGet} object. */ +public class AsyncGetFuture extends AbstractFuture { + public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class); + + private final AtomicBoolean called = new AtomicBoolean(false); + private final AsyncGet asyncGet; + + public AsyncGetFuture(AsyncGet asyncGet) { + this.asyncGet = asyncGet; + } + + private void callAsyncGet(long timeout, TimeUnit unit) { + if (!isCancelled() && called.compareAndSet(false, true)) { + try { + set(asyncGet.get(timeout, unit)); + } catch (TimeoutException te) { + LOG.trace("TRACE", te); + called.compareAndSet(true, false); + } catch (Throwable e) { + LOG.trace("TRACE", e); + setException(e); + } + } + } + + @Override + public T get() throws InterruptedException, ExecutionException { + callAsyncGet(-1, TimeUnit.MILLISECONDS); + return super.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException { + callAsyncGet(timeout, unit); + return super.get(0, TimeUnit.MILLISECONDS); + } + + @Override + public boolean isDone() { + callAsyncGet(0, TimeUnit.MILLISECONDS); + return super.isDone(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 8ee3a2c64d..0ad191b4fe 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -18,20 +18,6 @@ package org.apache.hadoop.ipc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,6 +34,17 @@ import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + public class TestAsyncIPC { private static Configuration conf; @@ -87,26 +84,50 @@ public void run() { try { final long param = TestIPC.RANDOM.nextLong(); TestIPC.call(client, param, server, conf); - Future returnFuture = Client.getReturnRpcResponse(); - returnFutures.put(i, returnFuture); + returnFutures.put(i, Client.getAsyncRpcResponse()); expectedValues.put(i, param); } catch (Exception e) { - LOG.fatal("Caught: " + StringUtils.stringifyException(e)); failed = true; + throw new RuntimeException(e); } } } - public void waitForReturnValues() throws InterruptedException, - ExecutionException { + void assertReturnValues() throws InterruptedException, ExecutionException { for (int i = 0; i < count; i++) { LongWritable value = returnFutures.get(i).get(); - if (expectedValues.get(i) != value.get()) { - LOG.fatal(String.format("Call-%d failed!", i)); - failed = true; - break; + Assert.assertEquals("call" + i + " failed.", + expectedValues.get(i).longValue(), value.get()); + } + Assert.assertFalse(failed); + } + + void assertReturnValues(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException { + final boolean[] checked = new boolean[count]; + for(boolean done = false; !done;) { + done = true; + for (int i = 0; i < count; i++) { + if (checked[i]) { + continue; + } else { + done = false; + } + + final LongWritable value; + try { + value = returnFutures.get(i).get(timeout, unit); + } catch (TimeoutException e) { + LOG.info("call" + i + " caught ", e); + continue; + } + + Assert.assertEquals("call" + i + " failed.", + expectedValues.get(i).longValue(), value.get()); + checked[i] = true; } } + Assert.assertFalse(failed); } } @@ -183,8 +204,7 @@ private void runCall(final int idx, final long param) private void doCall(final int idx, final long param) throws IOException { TestIPC.call(client, param, server, conf); - Future returnFuture = Client.getReturnRpcResponse(); - returnFutures.put(idx, returnFuture); + returnFutures.put(idx, Client.getAsyncRpcResponse()); expectedValues.put(idx, param); } @@ -233,10 +253,7 @@ public void internalTestAsyncCall(int handlerCount, boolean handlerSleep, } for (int i = 0; i < callerCount; i++) { callers[i].join(); - callers[i].waitForReturnValues(); - String msg = String.format("Expected not failed for caller-%d: %s.", i, - callers[i]); - assertFalse(msg, callers[i].failed); + callers[i].assertReturnValues(); } for (int i = 0; i < clientCount; i++) { clients[i].stop(); @@ -258,25 +275,37 @@ public void testCallGetReturnRpcResponseMultipleTimes() throws IOException, try { AsyncCaller caller = new AsyncCaller(client, addr, callCount); caller.run(); - - caller.waitForReturnValues(); - String msg = String.format( - "First time, expected not failed for caller: %s.", caller); - assertFalse(msg, caller.failed); - - caller.waitForReturnValues(); - assertTrue(asyncCallCount == client.getAsyncCallCount()); - msg = String.format("Second time, expected not failed for caller: %s.", - caller); - assertFalse(msg, caller.failed); - - assertTrue(asyncCallCount == client.getAsyncCallCount()); + caller.assertReturnValues(); + caller.assertReturnValues(); + caller.assertReturnValues(); + Assert.assertEquals(asyncCallCount, client.getAsyncCallCount()); } finally { client.stop(); server.stop(); } } + @Test(timeout = 60000) + public void testFutureGetWithTimeout() throws IOException, + InterruptedException, ExecutionException { +// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL); + final Server server = new TestIPC.TestServer(10, true, conf); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + final Client client = new Client(LongWritable.class, conf); + + try { + final AsyncCaller caller = new AsyncCaller(client, addr, 10); + caller.run(); + caller.assertReturnValues(10, TimeUnit.MILLISECONDS); + } finally { + client.stop(); + server.stop(); + } + } + + public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException { @@ -367,9 +396,7 @@ public void run() { server.start(); final AsyncCaller caller = new AsyncCaller(client, addr, 4); caller.run(); - caller.waitForReturnValues(); - String msg = String.format("Expected not failed for caller: %s.", caller); - assertFalse(msg, caller.failed); + caller.assertReturnValues(); } finally { client.stop(); server.stop(); @@ -406,9 +433,7 @@ public void run() { server.start(); final AsyncCaller caller = new AsyncCaller(client, addr, 10); caller.run(); - caller.waitForReturnValues(); - String msg = String.format("Expected not failed for caller: %s.", caller); - assertFalse(msg, caller.failed); + caller.assertReturnValues(); } finally { client.stop(); server.stop(); @@ -443,9 +468,7 @@ public void run() { server.start(); final AsyncCaller caller = new AsyncCaller(client, addr, 10); caller.run(); - caller.waitForReturnValues(); - String msg = String.format("Expected not failed for caller: %s.", caller); - assertFalse(msg, caller.failed); + caller.assertReturnValues(); } finally { client.stop(); server.stop(); @@ -489,10 +512,7 @@ public void run() { } for (int i = 0; i < callerCount; ++i) { callers[i].join(); - callers[i].waitForReturnValues(); - String msg = String.format("Expected not failed for caller-%d: %s.", i, - callers[i]); - assertFalse(msg, callers[i].failed); + callers[i].assertReturnValues(); } } finally { client.stop(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 4fe0861921..6bfd71d359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -19,20 +19,16 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.hadoop.ipc.Client; -import com.google.common.util.concurrent.AbstractFuture; - /**************************************************************** * Implementation of the asynchronous distributed file system. * This instance of this class is the way end-user code interacts @@ -52,22 +48,8 @@ public class AsyncDistributedFileSystem { } static Future getReturnValue() { - final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB - .getReturnValueCallback(); - Future returnFuture = new AbstractFuture() { - private final AtomicBoolean called = new AtomicBoolean(false); - public T get() throws InterruptedException, ExecutionException { - if (called.compareAndSet(false, true)) { - try { - set(returnValueCallback.call()); - } catch (Exception e) { - setException(e); - } - } - return super.get(); - } - }; - return returnFuture; + return new AsyncGetFuture<>( + ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index faa925c0c4..939c1ac140 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -24,7 +24,8 @@ import java.util.List; import com.google.common.collect.Lists; -import java.util.concurrent.Callable; + +import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -198,6 +199,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; +import org.apache.hadoop.util.concurrent.AsyncGet; /** * This class forwards NN's ClientProtocol calls as RPC calls to the NN server @@ -209,8 +211,8 @@ public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; - private static final ThreadLocal> - RETURN_VALUE_CALLBACK = new ThreadLocal<>(); + private static final ThreadLocal> + ASYNC_RETURN_VALUE = new ThreadLocal<>(); static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); @@ -246,8 +248,8 @@ public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) { @SuppressWarnings("unchecked") @Unstable - public static Callable getReturnValueCallback() { - return (Callable) RETURN_VALUE_CALLBACK.get(); + public static AsyncGet getAsyncReturnValue() { + return (AsyncGet) ASYNC_RETURN_VALUE.get(); } @Override @@ -369,7 +371,7 @@ public void setPermission(String src, FsPermission permission) try { if (Client.isAsynchronousMode()) { rpcProxy.setPermission(null, req); - setReturnValueCallback(); + setAsyncReturnValue(); } else { rpcProxy.setPermission(null, req); } @@ -378,17 +380,18 @@ public void setPermission(String src, FsPermission permission) } } - private void setReturnValueCallback() { - final Callable returnMessageCallback = ProtobufRpcEngine - .getReturnMessageCallback(); - Callable callBack = new Callable() { + private void setAsyncReturnValue() { + final AsyncGet asyncReturnMessage + = ProtobufRpcEngine.getAsyncReturnMessage(); + final AsyncGet asyncGet + = new AsyncGet() { @Override - public Void call() throws Exception { - returnMessageCallback.call(); + public Void get(long timeout, TimeUnit unit) throws Exception { + asyncReturnMessage.get(timeout, unit); return null; } }; - RETURN_VALUE_CALLBACK.set(callBack); + ASYNC_RETURN_VALUE.set(asyncGet); } @Override @@ -403,7 +406,7 @@ public void setOwner(String src, String username, String groupname) try { if (Client.isAsynchronousMode()) { rpcProxy.setOwner(null, req.build()); - setReturnValueCallback(); + setAsyncReturnValue(); } else { rpcProxy.setOwner(null, req.build()); } @@ -536,7 +539,7 @@ public void rename2(String src, String dst, Rename... options) try { if (Client.isAsynchronousMode()) { rpcProxy.rename2(null, req); - setReturnValueCallback(); + setAsyncReturnValue(); } else { rpcProxy.rename2(null, req); }