diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8023e13292..50db65db89 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -340,6 +340,8 @@ Trunk (Unreleased) HADOOP-9264. Port change to use Java untar API on Windows from branch-1-win to trunk. (Chris Nauroth via suresh) + HADOOP-9393. TestRPC fails with JDK7. (Andrew Wang via atm) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index a4e915a30a..f65fcfcadf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -834,23 +834,27 @@ public void testRPCInterruptedSimple() throws Exception { TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null ); server.start(); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - - final TestProtocol proxy = (TestProtocol) RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, addr, conf); - // Connect to the server - proxy.ping(); - // Interrupt self, try another call - Thread.currentThread().interrupt(); try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + + final TestProtocol proxy = (TestProtocol) RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + // Connect to the server proxy.ping(); - fail("Interruption did not cause IPC to fail"); - } catch (IOException ioe) { - if (!ioe.toString().contains("InterruptedException")) { - throw ioe; + // Interrupt self, try another call + Thread.currentThread().interrupt(); + try { + proxy.ping(); + fail("Interruption did not cause IPC to fail"); + } catch (IOException ioe) { + if (!ioe.toString().contains("InterruptedException")) { + throw ioe; + } + // clear interrupt status for future tests + Thread.interrupted(); } - // clear interrupt status for future tests - Thread.interrupted(); + } finally { + server.stop(); } } @@ -862,59 +866,62 @@ TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null ); server.start(); - - int numConcurrentRPC = 200; - InetSocketAddress addr = NetUtils.getConnectAddress(server); - final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC); - final CountDownLatch latch = new CountDownLatch(numConcurrentRPC); - final AtomicBoolean leaderRunning = new AtomicBoolean(true); - final AtomicReference error = new AtomicReference(); - Thread leaderThread = null; - - for (int i = 0; i < numConcurrentRPC; i++) { - final int num = i; - final TestProtocol proxy = (TestProtocol) RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, addr, conf); - Thread rpcThread = new Thread(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - while (num == 0 || leaderRunning.get()) { + try { + int numConcurrentRPC = 200; + InetSocketAddress addr = NetUtils.getConnectAddress(server); + final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC); + final CountDownLatch latch = new CountDownLatch(numConcurrentRPC); + final AtomicBoolean leaderRunning = new AtomicBoolean(true); + final AtomicReference error = new AtomicReference(); + Thread leaderThread = null; + + for (int i = 0; i < numConcurrentRPC; i++) { + final int num = i; + final TestProtocol proxy = (TestProtocol) RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + Thread rpcThread = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + while (num == 0 || leaderRunning.get()) { + proxy.slowPing(false); + } + proxy.slowPing(false); + } catch (Exception e) { + if (num == 0) { + leaderRunning.set(false); + } else { + error.set(e); + } + + LOG.error(e); + } finally { + latch.countDown(); } - - proxy.slowPing(false); - } catch (Exception e) { - if (num == 0) { - leaderRunning.set(false); - } else { - error.set(e); - } - - LOG.error(e); - } finally { - latch.countDown(); } + }); + rpcThread.start(); + + if (leaderThread == null) { + leaderThread = rpcThread; } - }); - rpcThread.start(); - - if (leaderThread == null) { - leaderThread = rpcThread; } + // let threads get past the barrier + Thread.sleep(1000); + // stop a single thread + while (leaderRunning.get()) { + leaderThread.interrupt(); + } + + latch.await(); + + // should not cause any other thread to get an error + assertTrue("rpc got exception " + error.get(), error.get() == null); + } finally { + server.stop(); } - // let threads get past the barrier - Thread.sleep(1000); - // stop a single thread - while (leaderRunning.get()) { - leaderThread.interrupt(); - } - - latch.await(); - - // should not cause any other thread to get an error - assertTrue("rpc got exception " + error.get(), error.get() == null); } public static void main(String[] args) throws Exception {