diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 83e4b9ec8b..e68bfd4bbd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -60,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ThreadLocal> ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); - static { // Register the rpcRequest deserializer for WritableRpcEngine + static { // Register the rpcRequest deserializer for ProtobufRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class, new Server.ProtoBufRpcInvoker()); @@ -194,7 +194,8 @@ public Message invoke(Object proxy, final Method method, Object[] args) } if (args.length != 2) { // RpcController + Message - throw new ServiceException("Too many parameters for request. Method: [" + throw new ServiceException( + "Too many or few parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 3f68d6334c..a62748e6e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; @@ -26,7 +28,6 @@ import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; -import java.io.*; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -37,11 +38,12 @@ import javax.net.SocketFactory; -import org.apache.commons.logging.*; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; @@ -54,7 +56,6 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.*; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -87,7 +88,7 @@ public enum RpcKind { RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - public final short value; //TODO make it private + private final short value; RpcKind(short val) { this.value = val; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index f20ba94984..531d5741dd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -237,14 +237,14 @@ private static Set addExceptions( static class RpcKindMapValue { final Class rpcRequestWrapperClass; final RpcInvoker rpcInvoker; + RpcKindMapValue (Class rpcRequestWrapperClass, RpcInvoker rpcInvoker) { this.rpcInvoker = rpcInvoker; this.rpcRequestWrapperClass = rpcRequestWrapperClass; } } - static Map rpcKindMap = new - HashMap(4); + static Map rpcKindMap = new HashMap<>(4); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index 0ad9abc539..ed3a9d053e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -730,7 +730,7 @@ public static UserGroupInformation getBestUGI( * * @param user The principal name to load from the ticket * cache - * @param ticketCachePath the path to the ticket cache file + * @param ticketCache the path to the ticket cache file * * @throws IOException if the kerberos login fails */ @@ -790,7 +790,7 @@ public static UserGroupInformation getUGIFromTicketCache( /** * Create a UserGroupInformation from a Subject with Kerberos principal. * - * @param user The KerberosPrincipal to use in UGI + * @param subject The KerberosPrincipal to use in UGI * * @throws IOException if the kerberos login fails */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java index eb7b949709..9356dabe2f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java @@ -17,13 +17,8 @@ */ package org.apache.hadoop.ipc; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.atomic.AtomicLong; - +import com.google.common.base.Joiner; +import com.google.protobuf.BlockingService; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -34,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.RPC.Server; -import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; @@ -45,8 +39,12 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.base.Joiner; -import com.google.protobuf.BlockingService; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicLong; /** * Benchmark for protobuf RPC. @@ -68,7 +66,7 @@ private static class MyOptions { public int secondsToRun = 15; private int msgSize = 1024; public Class rpcEngine = - WritableRpcEngine.class; + ProtobufRpcEngine.class; private MyOptions(String args[]) { try { @@ -135,7 +133,7 @@ private Options buildOptions() { opts.addOption( OptionBuilder.withLongOpt("engine").hasArg(true) - .withArgName("writable|protobuf") + .withArgName("protobuf") .withDescription("engine to use") .create('e')); @@ -184,8 +182,6 @@ private void processOptions(CommandLine line, Options opts) String eng = line.getOptionValue('e'); if ("protobuf".equals(eng)) { rpcEngine = ProtobufRpcEngine.class; - } else if ("writable".equals(eng)) { - rpcEngine = WritableRpcEngine.class; } else { throw new ParseException("invalid engine: " + eng); } @@ -237,11 +233,6 @@ private Server startServer(MyOptions opts) throws IOException { server = new RPC.Builder(conf).setProtocol(TestRpcService.class) .setInstance(service).setBindAddress(opts.host).setPort(opts.getPort()) .setNumHandlers(opts.serverThreads).setVerbose(false).build(); - } else if (opts.rpcEngine == WritableRpcEngine.class) { - server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host) - .setPort(opts.getPort()).setNumHandlers(opts.serverThreads) - .setVerbose(false).build(); } else { throw new RuntimeException("Bad engine: " + opts.rpcEngine); } @@ -399,15 +390,6 @@ public String doEcho(String msg) throws Exception { return responseProto.getMessage(); } }; - } else if (opts.rpcEngine == WritableRpcEngine.class) { - final TestProtocol proxy = RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, addr, conf); - return new RpcServiceWrapper() { - @Override - public String doEcho(String msg) throws Exception { - return proxy.echo(msg); - } - }; } else { throw new RuntimeException("unsupported engine: " + opts.rpcEngine); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index 8b419e36d4..10e23baefe 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -17,252 +17,28 @@ */ package org.apache.hadoop.ipc; -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; -import org.apache.hadoop.net.NetUtils; -import org.junit.Before; import org.junit.After; +import org.junit.Before; import org.junit.Test; -import com.google.protobuf.BlockingService; public class TestMultipleProtocolServer extends TestRpcBase { - private static InetSocketAddress addr; + private static RPC.Server server; - private static Configuration conf = new Configuration(); - - - @ProtocolInfo(protocolName="Foo") - interface Foo0 extends VersionedProtocol { - public static final long versionID = 0L; - String ping() throws IOException; - - } - - @ProtocolInfo(protocolName="Foo") - interface Foo1 extends VersionedProtocol { - public static final long versionID = 1L; - String ping() throws IOException; - String ping2() throws IOException; - } - - @ProtocolInfo(protocolName="Foo") - interface FooUnimplemented extends VersionedProtocol { - public static final long versionID = 2L; - String ping() throws IOException; - } - - interface Mixin extends VersionedProtocol{ - public static final long versionID = 0L; - void hello() throws IOException; - } - - interface Bar extends Mixin { - public static final long versionID = 0L; - int echo(int i) throws IOException; - } - - class Foo0Impl implements Foo0 { - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return Foo0.versionID; - } - - @SuppressWarnings("unchecked") - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)getClass(). - getGenericInterfaces()[0]; - } catch (Exception e) { - throw new IOException(e); - } - return ProtocolSignature.getProtocolSignature(clientMethodsHash, - getProtocolVersion(protocol, clientVersion), inter); - } - - @Override - public String ping() { - return "Foo0"; - } - - } - - class Foo1Impl implements Foo1 { - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return Foo1.versionID; - } - - @SuppressWarnings("unchecked") - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)getClass(). - getGenericInterfaces()[0]; - } catch (Exception e) { - throw new IOException(e); - } - return ProtocolSignature.getProtocolSignature(clientMethodsHash, - getProtocolVersion(protocol, clientVersion), inter); - } - - @Override - public String ping() { - return "Foo1"; - } - - @Override - public String ping2() { - return "Foo1"; - - } - - } - - - class BarImpl implements Bar { - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return Bar.versionID; - } - - @SuppressWarnings("unchecked") - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)getClass(). - getGenericInterfaces()[0]; - } catch (Exception e) { - throw new IOException(e); - } - return ProtocolSignature.getProtocolSignature(clientMethodsHash, - getProtocolVersion(protocol, clientVersion), inter); - } - - @Override - public int echo(int i) { - return i; - } - - @Override - public void hello() { - - - } - } @Before public void setUp() throws Exception { - // create a server with two handlers - server = new RPC.Builder(conf).setProtocol(Foo0.class) - .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); - - - // Add Protobuf server - // Create server side implementation - PBServerImpl pbServerImpl = new PBServerImpl(); - BlockingService service = TestProtobufRpcProto - .newReflectiveBlockingService(pbServerImpl); - server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, - service); - server.start(); - addr = NetUtils.getConnectAddress(server); + super.setupConf(); + + server = setupTestServer(conf, 2); } - + @After public void tearDown() throws Exception { server.stop(); } - @Test - public void test1() throws IOException { - ProtocolProxy proxy; - proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); - Foo0 foo0 = (Foo0)proxy.getProxy(); - Assert.assertEquals("Foo0", foo0.ping()); - - - proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); - - - Foo1 foo1 = (Foo1)proxy.getProxy(); - Assert.assertEquals("Foo1", foo1.ping()); - Assert.assertEquals("Foo1", foo1.ping()); - - - proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); - - - Bar bar = (Bar)proxy.getProxy(); - Assert.assertEquals(99, bar.echo(99)); - - // Now test Mixin class method - - Mixin mixin = bar; - mixin.hello(); - } - - - // Server does not implement the FooUnimplemented version of protocol Foo. - // See that calls to it fail. - @Test(expected=IOException.class) - public void testNonExistingProtocol() throws IOException { - ProtocolProxy proxy; - proxy = RPC.getProtocolProxy(FooUnimplemented.class, - FooUnimplemented.versionID, addr, conf); - - FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); - foo.ping(); - } - - /** - * getProtocolVersion of an unimplemented version should return highest version - * Similarly getProtocolSignature should work. - * @throws IOException - */ - @Test - public void testNonExistingProtocol2() throws IOException { - ProtocolProxy proxy; - proxy = RPC.getProtocolProxy(FooUnimplemented.class, - FooUnimplemented.versionID, addr, conf); - - FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); - Assert.assertEquals(Foo1.versionID, - foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), - FooUnimplemented.versionID)); - foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), - FooUnimplemented.versionID, 0); - } - - @Test(expected=IOException.class) - public void testIncorrectServerCreation() throws IOException { - new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false) - .build(); - } - // Now test a PB service - a server hosts both PB and Writable Rpcs. @Test public void testPBService() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java index 969f728f77..6d83d7d368 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java @@ -25,19 +25,6 @@ public class TestRPCCallBenchmark { - @Test(timeout=20000) - public void testBenchmarkWithWritable() throws Exception { - int rc = ToolRunner.run(new RPCCallBenchmark(), - new String[] { - "--clientThreads", "30", - "--serverThreads", "30", - "--time", "5", - "--serverReaderThreads", "4", - "--messageSize", "1024", - "--engine", "writable"}); - assertEquals(0, rc); - } - @Test(timeout=20000) public void testBenchmarkWithProto() throws Exception { int rc = ToolRunner.run(new RPCCallBenchmark(), diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index 2ac2be990d..a06d9fdc01 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -18,27 +18,19 @@ package org.apache.hadoop.ipc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import org.junit.Assert; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; -import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; -import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; -import org.apache.hadoop.net.NetUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; /** Unit test for supporting method-name based compatible RPCs. */ public class TestRPCCompatibility { @@ -49,7 +41,7 @@ public class TestRPCCompatibility { public static final Log LOG = LogFactory.getLog(TestRPCCompatibility.class); - + private static Configuration conf = new Configuration(); public interface TestProtocol0 extends VersionedProtocol { @@ -120,6 +112,21 @@ public long getProtocolVersion(String protocol, @Before public void setUp() { ProtocolSignature.resetCache(); + + RPC.setProtocolEngine(conf, + TestProtocol0.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol1.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol2.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol3.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol4.class, ProtobufRpcEngine.class); } @After @@ -133,117 +140,7 @@ public void tearDown() { server = null; } } - - @Test // old client vs new server - public void testVersion0ClientVersion1Server() throws Exception { - // create a server with two handlers - TestImpl1 impl = new TestImpl1(); - server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - addr = NetUtils.getConnectAddress(server); - proxy = RPC.getProtocolProxy( - TestProtocol0.class, TestProtocol0.versionID, addr, conf); - - TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy(); - proxy0.ping(); - } - - @Test // old client vs new server - public void testVersion1ClientVersion0Server() throws Exception { - // create a server with two handlers - server = new RPC.Builder(conf).setProtocol(TestProtocol0.class) - .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.start(); - addr = NetUtils.getConnectAddress(server); - - proxy = RPC.getProtocolProxy( - TestProtocol1.class, TestProtocol1.versionID, addr, conf); - - TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy(); - proxy1.ping(); - try { - proxy1.echo("hello"); - fail("Echo should fail"); - } catch(IOException e) { - } - } - - private class Version2Client { - - private TestProtocol2 proxy2; - private ProtocolProxy serverInfo; - - private Version2Client() throws IOException { - serverInfo = RPC.getProtocolProxy( - TestProtocol2.class, TestProtocol2.versionID, addr, conf); - proxy2 = serverInfo.getProxy(); - } - - public int echo(int value) throws IOException, NumberFormatException { - if (serverInfo.isMethodSupported("echo", int.class)) { -System.out.println("echo int is supported"); - return -value; // use version 3 echo long - } else { // server is version 2 -System.out.println("echo int is NOT supported"); - return Integer.parseInt(proxy2.echo(String.valueOf(value))); - } - } - - public String echo(String value) throws IOException { - return proxy2.echo(value); - } - - public void ping() throws IOException { - proxy2.ping(); - } - } - - @Test // Compatible new client & old server - public void testVersion2ClientVersion1Server() throws Exception { - // create a server with two handlers - TestImpl1 impl = new TestImpl1(); - server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - addr = NetUtils.getConnectAddress(server); - - - Version2Client client = new Version2Client(); - client.ping(); - assertEquals("hello", client.echo("hello")); - - // echo(int) is not supported by server, so returning 3 - // This verifies that echo(int) and echo(String)'s hash codes are different - assertEquals(3, client.echo(3)); - } - - @Test // equal version client and server - public void testVersion2ClientVersion2Server() throws Exception { - // create a server with two handlers - TestImpl2 impl = new TestImpl2(); - server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - addr = NetUtils.getConnectAddress(server); - - Version2Client client = new Version2Client(); - - client.ping(); - assertEquals("hello", client.echo("hello")); - - // now that echo(int) is supported by the server, echo(int) should return -3 - assertEquals(-3, client.echo(3)); - } - public interface TestProtocol3 { int echo(String value); int echo(int value); @@ -297,97 +194,4 @@ public interface TestProtocol4 extends TestProtocol2 { @Override int echo(int value) throws IOException; } - - @Test - public void testVersionMismatch() throws IOException { - server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) - .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.start(); - addr = NetUtils.getConnectAddress(server); - - TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class, - TestProtocol4.versionID, addr, conf); - try { - proxy.echo(21); - fail("The call must throw VersionMismatch exception"); - } catch (RemoteException ex) { - Assert.assertEquals(RPC.VersionMismatch.class.getName(), - ex.getClassName()); - Assert.assertTrue(ex.getErrorCode().equals( - RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH)); - } catch (IOException ex) { - fail("Expected version mismatch but got " + ex); - } - } - - @Test - public void testIsMethodSupported() throws IOException { - server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) - .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.start(); - addr = NetUtils.getConnectAddress(server); - - TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, - TestProtocol2.versionID, addr, conf); - boolean supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE, - RPC.getProtocolVersion(TestProtocol2.class), "echo"); - Assert.assertTrue(supported); - supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, - RPC.getProtocolVersion(TestProtocol2.class), "echo"); - Assert.assertFalse(supported); - } - - /** - * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up - * the server registry to extract protocol signatures and versions. - */ - @Test - public void testProtocolMetaInfoSSTranslatorPB() throws Exception { - TestImpl1 impl = new TestImpl1(); - server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - - ProtocolMetaInfoServerSideTranslatorPB xlator = - new ProtocolMetaInfoServerSideTranslatorPB(server); - - GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( - null, - createGetProtocolSigRequestProto(TestProtocol1.class, - RPC.RpcKind.RPC_PROTOCOL_BUFFER)); - //No signatures should be found - Assert.assertEquals(0, resp.getProtocolSignatureCount()); - resp = xlator.getProtocolSignature( - null, - createGetProtocolSigRequestProto(TestProtocol1.class, - RPC.RpcKind.RPC_WRITABLE)); - Assert.assertEquals(1, resp.getProtocolSignatureCount()); - ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); - Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); - boolean found = false; - int expected = ProtocolSignature.getFingerprint(TestProtocol1.class - .getMethod("echo", String.class)); - for (int m : sig.getMethodsList()) { - if (expected == m) { - found = true; - break; - } - } - Assert.assertTrue(found); - } - - private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( - Class protocol, RPC.RpcKind rpcKind) { - GetProtocolSignatureRequestProto.Builder builder = - GetProtocolSignatureRequestProto.newBuilder(); - builder.setProtocol(protocol.getName()); - builder.setRpcKind(rpcKind.toString()); - return builder.build(); - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java index 5807998a15..d810fe3c5a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java @@ -18,9 +18,8 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; -import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,30 +29,39 @@ import java.net.InetSocketAddress; import java.nio.channels.ClosedByInterruptException; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; + /** * tests that the proxy can be interrupted */ -public class TestRPCWaitForProxy extends Assert { - private static final String ADDRESS = "0.0.0.0"; +public class TestRPCWaitForProxy extends TestRpcBase { private static final Logger LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class); private static final Configuration conf = new Configuration(); + @Before + public void setupProtocolEngine() { + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + } + /** * This tests that the time-bounded wait for a proxy operation works, and * times out. * * @throws Throwable any exception other than that which was expected */ - @Test(timeout = 10000) + @Test(timeout = 50000) public void testWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(0); worker.start(); worker.join(); Throwable caught = worker.getCaught(); - assertNotNull("No exception was raised", caught); - if (!(caught instanceof ConnectException)) { + Throwable cause = caught.getCause(); + Assert.assertNotNull("No exception was raised", cause); + if (!(cause instanceof ConnectException)) { throw caught; } } @@ -69,11 +77,11 @@ public void testInterruptedWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(100); worker.start(); Thread.sleep(1000); - assertTrue("worker hasn't started", worker.waitStarted); + Assert.assertTrue("worker hasn't started", worker.waitStarted); worker.interrupt(); worker.join(); Throwable caught = worker.getCaught(); - assertNotNull("No exception was raised", caught); + Assert.assertNotNull("No exception was raised", caught); // looking for the root cause here, which can be wrapped // as part of the NetUtils work. Having this test look // a the type of exception there would be brittle to improvements @@ -82,6 +90,8 @@ public void testInterruptedWaitForProxy() throws Throwable { if (cause == null) { // no inner cause, use outer exception as root cause. cause = caught; + } else if (cause.getCause() != null) { + cause = cause.getCause(); } if (!(cause instanceof InterruptedIOException) && !(cause instanceof ClosedByInterruptException)) { @@ -112,12 +122,16 @@ public void run() { IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, connectRetries); waitStarted = true; - TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, - TestProtocol.versionID, - new InetSocketAddress(ADDRESS, 20), - config, - 15000L); - proxy.echo(""); + + short invalidPort = 20; + InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, + invalidPort); + TestRpcBase.TestRpcService proxy = RPC.getProxy( + TestRpcBase.TestRpcService.class, + 1L, invalidAddress, conf); + // Test echo method + proxy.echo(null, newEchoRequest("hello")); + } catch (Throwable throwable) { caught = throwable; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index e991405f0c..3c5885f4e4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -109,7 +109,8 @@ protected static RPC.Server setupTestServer(Configuration serverConf, return setupTestServer(builder); } - protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException { + protected static RPC.Server setupTestServer( + RPC.Builder builder) throws IOException { RPC.Server server = builder.build(); server.start(); @@ -190,17 +191,21 @@ public static class TestTokenIdentifier extends TokenIdentifier { public TestTokenIdentifier() { this(new Text(), new Text()); } + public TestTokenIdentifier(Text tokenid) { this(tokenid, new Text()); } + public TestTokenIdentifier(Text tokenid, Text realUser) { this.tokenid = tokenid == null ? new Text() : tokenid; this.realUser = realUser == null ? new Text() : realUser; } + @Override public Text getKind() { return KIND_NAME; } + @Override public UserGroupInformation getUser() { if (realUser.toString().isEmpty()) { @@ -218,6 +223,7 @@ public void readFields(DataInput in) throws IOException { tokenid.readFields(in); realUser.readFields(in); } + @Override public void write(DataOutput out) throws IOException { tokenid.write(out); @@ -249,7 +255,7 @@ public static class TestTokenSelector implements @SuppressWarnings("unchecked") @Override public Token selectToken(Text service, - Collection> tokens) { + Collection> tokens) { if (service == null) { return null; } @@ -403,19 +409,17 @@ public TestProtos.AuthMethodResponseProto getAuthMethod( } @Override - public TestProtos.AuthUserResponseProto getAuthUser( + public TestProtos.UserResponseProto getAuthUser( RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException { - UserGroupInformation authUser = null; + UserGroupInformation authUser; try { authUser = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new ServiceException(e); } - return TestProtos.AuthUserResponseProto.newBuilder() - .setAuthUser(authUser.getUserName()) - .build(); + return newUserResponse(authUser.getUserName()); } @Override @@ -447,6 +451,34 @@ public TestProtos.EmptyResponseProto sendPostponed( return TestProtos.EmptyResponseProto.newBuilder().build(); } + + @Override + public TestProtos.UserResponseProto getCurrentUser( + RpcController controller, + TestProtos.EmptyRequestProto request) throws ServiceException { + String user; + try { + user = UserGroupInformation.getCurrentUser().toString(); + } catch (IOException e) { + throw new ServiceException("Failed to get current user", e); + } + + return newUserResponse(user); + } + + @Override + public TestProtos.UserResponseProto getServerRemoteUser( + RpcController controller, + TestProtos.EmptyRequestProto request) throws ServiceException { + String serverRemoteUser = Server.getRemoteUser().toString(); + return newUserResponse(serverRemoteUser); + } + + private TestProtos.UserResponseProto newUserResponse(String user) { + return TestProtos.UserResponseProto.newBuilder() + .setUser(user) + .build(); + } } protected static TestProtos.EmptyRequestProto newEmptyRequest() { @@ -493,8 +525,4 @@ protected static AuthMethod convert( } return null; } - - protected static String convert(TestProtos.AuthUserResponseProto response) { - return response.getAuthUser(); - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index 72371a7ae9..c48ff2e186 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -45,30 +45,55 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import javax.security.auth.callback.*; -import javax.security.sasl.*; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; import java.io.IOException; import java.lang.annotation.Annotation; import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.security.Security; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*; -import static org.junit.Assert.*; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** Unit tests for using Sasl over RPC. */ @RunWith(Parameterized.class) public class TestSaslRPC extends TestRpcBase { @Parameters public static Collection data() { - Collection params = new ArrayList(); + Collection params = new ArrayList<>(); for (QualityOfProtection qop : QualityOfProtection.values()) { params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null }); } @@ -114,7 +139,7 @@ enum UseToken { NONE(), VALID(), INVALID(), - OTHER(); + OTHER() } @BeforeClass @@ -230,7 +255,7 @@ public void testDigestRpcWithoutAnnotation() throws Exception { final Server server = setupTestServer(conf, 5, sm); doDigestRpc(server, sm); } finally { - SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); + SecurityUtil.setSecurityInfoProviders(); } } @@ -259,7 +284,7 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm) addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName())); - Token token = new Token(tokenId, sm); + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); @@ -296,8 +321,8 @@ public void testPingInterval() throws Exception { // set doPing to true newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); - ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0), - TestRpcService.class, null, 0, null, newConf); + ConnectionId remoteId = ConnectionId.getConnectionId( + new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf); assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, remoteId.getPingInterval()); // set doPing to false @@ -806,13 +831,13 @@ private String internalGetAuthMethod( final TestTokenSecretManager sm = new TestTokenSecretManager(); boolean useSecretManager = (serverAuth != SIMPLE); if (enableSecretManager != null) { - useSecretManager &= enableSecretManager.booleanValue(); + useSecretManager &= enableSecretManager; } if (forceSecretManager != null) { - useSecretManager |= forceSecretManager.booleanValue(); + useSecretManager |= forceSecretManager; } final SecretManager serverSm = useSecretManager ? sm : null; - + Server server = serverUgi.doAs(new PrivilegedExceptionAction() { @Override public Server run() throws IOException { @@ -867,13 +892,13 @@ public String run() throws IOException { proxy.ping(null, newEmptyRequest()); // make sure the other side thinks we are who we said we are!!! assertEquals(clientUgi.getUserName(), - convert(proxy.getAuthUser(null, newEmptyRequest()))); + proxy.getAuthUser(null, newEmptyRequest()).getUser()); AuthMethod authMethod = convert(proxy.getAuthMethod(null, newEmptyRequest())); // verify sasl completed with correct QOP assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null, - RPC.getConnectionIdForProxy(proxy).getSaslQop()); - return authMethod.toString(); + RPC.getConnectionIdForProxy(proxy).getSaslQop()); + return authMethod != null ? authMethod.toString() : null; } catch (ServiceException se) { if (se.getCause() instanceof RemoteException) { throw (RemoteException) se.getCause(); @@ -898,21 +923,18 @@ private static void assertAuthEquals(AuthMethod expect, String actual) { assertEquals(expect.toString(), actual); } - - private static void assertAuthEquals(Pattern expect, - String actual) { + + private static void assertAuthEquals(Pattern expect, String actual) { // this allows us to see the regexp and the value it didn't match if (!expect.matcher(actual).matches()) { - assertEquals(expect, actual); // it failed - } else { - assertTrue(true); // it matched + fail(); // it failed } } /* * Class used to test overriding QOP values using SaslPropertiesResolver */ - static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{ + static class AuthSaslPropertiesResolver extends SaslPropertiesResolver { @Override public Map getServerProperties(InetAddress address) { @@ -921,7 +943,7 @@ public Map getServerProperties(InetAddress address) { return newPropertes; } } - + public static void main(String[] args) throws Exception { System.out.println("Testing Kerberos authentication over RPC"); if (args.length != 2) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index 50d389c646..c4dbcac4c2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -17,40 +17,35 @@ */ package org.apache.hadoop.security; +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Enumeration; -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenInfo; -import org.junit.Before; -import org.junit.Test; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector; -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; - /** - * + * Test do as effective user. */ -public class TestDoAsEffectiveUser { +public class TestDoAsEffectiveUser extends TestRpcBase { final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG"; final private static String REAL_USER_SHORT_NAME = "realUser1"; final private static String PROXY_USER_NAME = "proxyUser"; @@ -58,8 +53,8 @@ public class TestDoAsEffectiveUser { final private static String GROUP2_NAME = "group2"; final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME, GROUP2_NAME }; - private static final String ADDRESS = "0.0.0.0"; - private TestProtocol proxy; + + private TestRpcService client; private static final Configuration masterConf = new Configuration(); @@ -82,7 +77,7 @@ public void setMasterConf() throws IOException { private void configureSuperUserIPAddresses(Configuration conf, String superUserShortName) throws IOException { - ArrayList ipList = new ArrayList(); + ArrayList ipList = new ArrayList<>(); Enumeration netInterfaceList = NetworkInterface .getNetworkInterfaces(); while (netInterfaceList.hasMoreElements()) { @@ -130,50 +125,19 @@ public UserGroupInformation run() throws IOException { curUGI.toString()); } - @TokenInfo(TestTokenSelector.class) - public interface TestProtocol extends VersionedProtocol { - public static final long versionID = 1L; - - String aMethod() throws IOException; - String getServerRemoteUser() throws IOException; - } - - public class TestImpl implements TestProtocol { - - @Override - public String aMethod() throws IOException { - return UserGroupInformation.getCurrentUser().toString(); - } - - @Override - public String getServerRemoteUser() throws IOException { - return Server.getRemoteUser().toString(); - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return TestProtocol.versionID; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return new ProtocolSignature(TestProtocol.versionID, null); - } - } - - private void checkRemoteUgi(final Server server, - final UserGroupInformation ugi, final Configuration conf) - throws Exception { + private void checkRemoteUgi(final UserGroupInformation ugi, + final Configuration conf) throws Exception { ugi.doAs(new PrivilegedExceptionAction() { @Override - public Void run() throws IOException { - proxy = RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, - NetUtils.getConnectAddress(server), conf); - Assert.assertEquals(ugi.toString(), proxy.aMethod()); - Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser()); + public Void run() throws ServiceException { + client = getClient(addr, conf); + String currentUser = client.getCurrentUser(null, + newEmptyRequest()).getUser(); + String serverRemoteUser = client.getServerRemoteUser(null, + newEmptyRequest()).getUser(); + + Assert.assertEquals(ugi.toString(), currentUser); + Assert.assertEquals(ugi.toString(), serverRemoteUser); return null; } }); @@ -185,29 +149,27 @@ public void testRealUserSetup() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).build(); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 5); refreshConf(conf); try { - server.start(); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(server, realUserUgi, conf); + checkRemoteUgi(realUserUgi, conf); - UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting( + UserGroupInformation proxyUserUgi = + UserGroupInformation.createProxyUserForTesting( PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(server, proxyUserUgi, conf); + checkRemoteUgi(proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -218,29 +180,25 @@ public void testRealUserAuthorizationSuccess() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 5); refreshConf(conf); try { - server.start(); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(server, realUserUgi, conf); + checkRemoteUgi(realUserUgi, conf); UserGroupInformation proxyUserUgi = UserGroupInformation .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(server, proxyUserUgi, conf); + checkRemoteUgi(proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -256,17 +214,14 @@ public void testRealUserIPAuthorizationFailure() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 5); refreshConf(conf); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -275,11 +230,10 @@ public void testRealUserIPAuthorizationFailure() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -287,10 +241,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -299,17 +250,14 @@ public void testRealUserIPNotSpecified() throws IOException { final Configuration conf = new Configuration(); conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 2); refreshConf(conf); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -318,11 +266,10 @@ public void testRealUserIPNotSpecified() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -330,10 +277,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -341,15 +285,12 @@ public String run() throws IOException { public void testRealUserGroupNotSpecified() throws IOException { final Configuration conf = new Configuration(); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 2); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -358,11 +299,10 @@ public void testRealUserGroupNotSpecified() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -370,10 +310,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -384,17 +321,14 @@ public void testRealUserGroupAuthorizationFailure() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group3"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 2); refreshConf(conf); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -403,11 +337,10 @@ public void testRealUserGroupAuthorizationFailure() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -415,10 +348,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -432,20 +362,17 @@ public void testProxyWithToken() throws Exception { final Configuration conf = new Configuration(masterConf); TestTokenSecretManager sm = new TestTokenSecretManager(); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(conf); - final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build(); - - server.start(); + final Server server = setupTestServer(conf, 5, sm); final UserGroupInformation current = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); + TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token(tokenId, + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); UserGroupInformation proxyUserUgi = UserGroupInformation @@ -453,23 +380,19 @@ public void testProxyWithToken() throws Exception { proxyUserUgi.addToken(token); refreshConf(conf); - + String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } catch (Exception e) { e.printStackTrace(); throw e; } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } }); @@ -486,42 +409,34 @@ public void testTokenBySuperUser() throws Exception { TestTokenSecretManager sm = new TestTokenSecretManager(); final Configuration newConf = new Configuration(masterConf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(newConf, TestRpcService.class, + ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(newConf); - final Server server = new RPC.Builder(newConf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); - - server.start(); + final Server server = setupTestServer(newConf, 5, sm); final UserGroupInformation current = UserGroupInformation .createUserForTesting(REAL_USER_NAME, GROUP_NAMES); refreshConf(newConf); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); + TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token(tokenId, - sm); + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); String retVal = current.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, newConf); - String ret = proxy.aMethod(); - return ret; + client = getClient(addr, newConf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } catch (Exception e) { e.printStackTrace(); throw e; } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } }); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java index b3ea5f23e3..e45d70db74 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -28,7 +29,11 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; @@ -50,9 +55,22 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL; -import static org.apache.hadoop.ipc.TestSaslRPC.*; -import static org.apache.hadoop.test.MetricsAsserts.*; -import static org.junit.Assert.*; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; +import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; +import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -109,7 +127,7 @@ public void resetUgi() { UserGroupInformation.setLoginUser(null); } - @Test (timeout = 30000) + @Test(timeout = 30000) public void testSimpleLogin() throws IOException { tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index 99cd93d711..6411f97ab6 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -88,6 +88,6 @@ message AuthMethodResponseProto { required string mechanismName = 2; } -message AuthUserResponseProto { - required string authUser = 1; +message UserResponseProto { + required string user = 1; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 3292115885..06f6c4fc1d 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -40,9 +40,11 @@ service TestProtobufRpcProto { rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); rpc sleep(SleepRequestProto) returns (EmptyResponseProto); rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto); - rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto); + rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto); rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto); rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto); + rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto); + rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto); } service TestProtobufRpc2Proto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 6b52949868..57f7cb197b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -168,7 +168,6 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; -import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; @@ -317,8 +316,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); - - WritableRpcEngine.ensureInitialized(); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java deleted file mode 100644 index 0b7ee337d8..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.security; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.mockito.Mockito.mock; - -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslRpcClient; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.Test; - -/** Unit tests for using Delegation Token over RPC. */ -public class TestClientProtocolWithDelegationToken { - private static final String ADDRESS = "0.0.0.0"; - - public static final Log LOG = LogFactory - .getLog(TestClientProtocolWithDelegationToken.class); - - private static final Configuration conf; - static { - conf = new Configuration(); - conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(conf); - } - - static { - GenericTestUtils.setLogLevel(Client.LOG, Level.ALL); - GenericTestUtils.setLogLevel(Server.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL); - } - - @Test - public void testDelegationTokenRpc() throws Exception { - ClientProtocol mockNN = mock(ClientProtocol.class); - FSNamesystem mockNameSys = mock(FSNamesystem.class); - - DelegationTokenSecretManager sm = new DelegationTokenSecretManager( - DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, - DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, - DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, - 3600000, mockNameSys); - sm.startThreads(); - final Server server = new RPC.Builder(conf) - .setProtocol(ClientProtocol.class).setInstance(mockNN) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); - - server.start(); - - final UserGroupInformation current = UserGroupInformation.getCurrentUser(); - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - String user = current.getUserName(); - Text owner = new Text(user); - DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null); - Token token = new Token( - dtId, sm); - SecurityUtil.setTokenService(token, addr); - LOG.info("Service for token is " + token.getService()); - current.addToken(token); - current.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - ClientProtocol proxy = null; - try { - proxy = RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, addr, conf); - proxy.getServerDefaults(); - } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } - } - return null; - } - }); - } - -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index 3fef5e278b..729af0a951 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -98,8 +97,6 @@ public void serviceInit(Configuration conf) throws Exception { BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService .newReflectiveBlockingService(refreshHSAdminProtocolXlator); - WritableRpcEngine.ensureInitialized(); - clientRpcAddress = conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.JHS_ADMIN_ADDRESS,