diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index e68bfd4bbd..83e4b9ec8b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -60,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ThreadLocal> ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); - static { // Register the rpcRequest deserializer for ProtobufRpcEngine + static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class, new Server.ProtoBufRpcInvoker()); @@ -194,8 +194,7 @@ public Message invoke(Object proxy, final Method method, Object[] args) } if (args.length != 2) { // RpcController + Message - throw new ServiceException( - "Too many or few parameters for request. Method: [" + throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 12a07a543d..3f68d6334c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ipc; -import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; @@ -28,6 +26,7 @@ import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; +import java.io.*; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -38,12 +37,11 @@ import javax.net.SocketFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.*; + import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.*; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; @@ -56,6 +54,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.*; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -88,7 +87,7 @@ public enum RpcKind { RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - private final short value; + public final short value; //TODO make it private RpcKind(short val) { this.value = val; @@ -208,7 +207,7 @@ static synchronized RpcEngine getProtocolEngine(Class protocol, RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), - ProtobufRpcEngine.class); + WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 531d5741dd..f20ba94984 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -237,14 +237,14 @@ private static Set addExceptions( static class RpcKindMapValue { final Class rpcRequestWrapperClass; final RpcInvoker rpcInvoker; - RpcKindMapValue (Class rpcRequestWrapperClass, RpcInvoker rpcInvoker) { this.rpcInvoker = rpcInvoker; this.rpcRequestWrapperClass = rpcRequestWrapperClass; } } - static Map rpcKindMap = new HashMap<>(4); + static Map rpcKindMap = new + HashMap(4); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index ed3a9d053e..0ad9abc539 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -730,7 +730,7 @@ public static UserGroupInformation getBestUGI( * * @param user The principal name to load from the ticket * cache - * @param ticketCache the path to the ticket cache file + * @param ticketCachePath the path to the ticket cache file * * @throws IOException if the kerberos login fails */ @@ -790,7 +790,7 @@ public static UserGroupInformation getUGIFromTicketCache( /** * Create a UserGroupInformation from a Subject with Kerberos principal. * - * @param subject The KerberosPrincipal to use in UGI + * @param user The KerberosPrincipal to use in UGI * * @throws IOException if the kerberos login fails */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java index 9356dabe2f..eb7b949709 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.ipc; -import com.google.common.base.Joiner; -import com.google.protobuf.BlockingService; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -29,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; @@ -39,12 +45,8 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Joiner; +import com.google.protobuf.BlockingService; /** * Benchmark for protobuf RPC. @@ -66,7 +68,7 @@ private static class MyOptions { public int secondsToRun = 15; private int msgSize = 1024; public Class rpcEngine = - ProtobufRpcEngine.class; + WritableRpcEngine.class; private MyOptions(String args[]) { try { @@ -133,7 +135,7 @@ private Options buildOptions() { opts.addOption( OptionBuilder.withLongOpt("engine").hasArg(true) - .withArgName("protobuf") + .withArgName("writable|protobuf") .withDescription("engine to use") .create('e')); @@ -182,6 +184,8 @@ private void processOptions(CommandLine line, Options opts) String eng = line.getOptionValue('e'); if ("protobuf".equals(eng)) { rpcEngine = ProtobufRpcEngine.class; + } else if ("writable".equals(eng)) { + rpcEngine = WritableRpcEngine.class; } else { throw new ParseException("invalid engine: " + eng); } @@ -233,6 +237,11 @@ private Server startServer(MyOptions opts) throws IOException { server = new RPC.Builder(conf).setProtocol(TestRpcService.class) .setInstance(service).setBindAddress(opts.host).setPort(opts.getPort()) .setNumHandlers(opts.serverThreads).setVerbose(false).build(); + } else if (opts.rpcEngine == WritableRpcEngine.class) { + server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host) + .setPort(opts.getPort()).setNumHandlers(opts.serverThreads) + .setVerbose(false).build(); } else { throw new RuntimeException("Bad engine: " + opts.rpcEngine); } @@ -390,6 +399,15 @@ public String doEcho(String msg) throws Exception { return responseProto.getMessage(); } }; + } else if (opts.rpcEngine == WritableRpcEngine.class) { + final TestProtocol proxy = RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + return new RpcServiceWrapper() { + @Override + public String doEcho(String msg) throws Exception { + return proxy.echo(msg); + } + }; } else { throw new RuntimeException("unsupported engine: " + opts.rpcEngine); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index 10e23baefe..8b419e36d4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -17,28 +17,252 @@ */ package org.apache.hadoop.ipc; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.junit.Assert; + import org.apache.hadoop.conf.Configuration; -import org.junit.After; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.net.NetUtils; import org.junit.Before; +import org.junit.After; import org.junit.Test; +import com.google.protobuf.BlockingService; public class TestMultipleProtocolServer extends TestRpcBase { - + private static InetSocketAddress addr; private static RPC.Server server; - @Before - public void setUp() throws Exception { - super.setupConf(); - - server = setupTestServer(conf, 2); + private static Configuration conf = new Configuration(); + + + @ProtocolInfo(protocolName="Foo") + interface Foo0 extends VersionedProtocol { + public static final long versionID = 0L; + String ping() throws IOException; + + } + + @ProtocolInfo(protocolName="Foo") + interface Foo1 extends VersionedProtocol { + public static final long versionID = 1L; + String ping() throws IOException; + String ping2() throws IOException; + } + + @ProtocolInfo(protocolName="Foo") + interface FooUnimplemented extends VersionedProtocol { + public static final long versionID = 2L; + String ping() throws IOException; + } + + interface Mixin extends VersionedProtocol{ + public static final long versionID = 0L; + void hello() throws IOException; } + interface Bar extends Mixin { + public static final long versionID = 0L; + int echo(int i) throws IOException; + } + + class Foo0Impl implements Foo0 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo0.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo0"; + } + + } + + class Foo1Impl implements Foo1 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo1.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo1"; + } + + @Override + public String ping2() { + return "Foo1"; + + } + + } + + + class BarImpl implements Bar { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Bar.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public int echo(int i) { + return i; + } + + @Override + public void hello() { + + + } + } + @Before + public void setUp() throws Exception { + // create a server with two handlers + server = new RPC.Builder(conf).setProtocol(Foo0.class) + .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); + + + // Add Protobuf server + // Create server side implementation + PBServerImpl pbServerImpl = new PBServerImpl(); + BlockingService service = TestProtobufRpcProto + .newReflectiveBlockingService(pbServerImpl); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, + service); + server.start(); + addr = NetUtils.getConnectAddress(server); + } + @After public void tearDown() throws Exception { server.stop(); } + @Test + public void test1() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); + Foo0 foo0 = (Foo0)proxy.getProxy(); + Assert.assertEquals("Foo0", foo0.ping()); + + + proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); + + + Foo1 foo1 = (Foo1)proxy.getProxy(); + Assert.assertEquals("Foo1", foo1.ping()); + Assert.assertEquals("Foo1", foo1.ping()); + + + proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); + + + Bar bar = (Bar)proxy.getProxy(); + Assert.assertEquals(99, bar.echo(99)); + + // Now test Mixin class method + + Mixin mixin = bar; + mixin.hello(); + } + + + // Server does not implement the FooUnimplemented version of protocol Foo. + // See that calls to it fail. + @Test(expected=IOException.class) + public void testNonExistingProtocol() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + foo.ping(); + } + + /** + * getProtocolVersion of an unimplemented version should return highest version + * Similarly getProtocolSignature should work. + * @throws IOException + */ + @Test + public void testNonExistingProtocol2() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + Assert.assertEquals(Foo1.versionID, + foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID)); + foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID, 0); + } + + @Test(expected=IOException.class) + public void testIncorrectServerCreation() throws IOException { + new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl()) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false) + .build(); + } + // Now test a PB service - a server hosts both PB and Writable Rpcs. @Test public void testPBService() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java index 6d83d7d368..969f728f77 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java @@ -25,6 +25,19 @@ public class TestRPCCallBenchmark { + @Test(timeout=20000) + public void testBenchmarkWithWritable() throws Exception { + int rc = ToolRunner.run(new RPCCallBenchmark(), + new String[] { + "--clientThreads", "30", + "--serverThreads", "30", + "--time", "5", + "--serverReaderThreads", "4", + "--messageSize", "1024", + "--engine", "writable"}); + assertEquals(0, rc); + } + @Test(timeout=20000) public void testBenchmarkWithProto() throws Exception { int rc = ToolRunner.run(new RPCCallBenchmark(), diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index a06d9fdc01..2ac2be990d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -18,19 +18,27 @@ package org.apache.hadoop.ipc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import org.junit.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; +import org.apache.hadoop.net.NetUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** Unit test for supporting method-name based compatible RPCs. */ public class TestRPCCompatibility { @@ -41,7 +49,7 @@ public class TestRPCCompatibility { public static final Log LOG = LogFactory.getLog(TestRPCCompatibility.class); - + private static Configuration conf = new Configuration(); public interface TestProtocol0 extends VersionedProtocol { @@ -112,21 +120,6 @@ public long getProtocolVersion(String protocol, @Before public void setUp() { ProtocolSignature.resetCache(); - - RPC.setProtocolEngine(conf, - TestProtocol0.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol1.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol2.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol3.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol4.class, ProtobufRpcEngine.class); } @After @@ -140,7 +133,117 @@ public void tearDown() { server = null; } } + + @Test // old client vs new server + public void testVersion0ClientVersion1Server() throws Exception { + // create a server with two handlers + TestImpl1 impl = new TestImpl1(); + server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + addr = NetUtils.getConnectAddress(server); + proxy = RPC.getProtocolProxy( + TestProtocol0.class, TestProtocol0.versionID, addr, conf); + + TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy(); + proxy0.ping(); + } + + @Test // old client vs new server + public void testVersion1ClientVersion0Server() throws Exception { + // create a server with two handlers + server = new RPC.Builder(conf).setProtocol(TestProtocol0.class) + .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.start(); + addr = NetUtils.getConnectAddress(server); + + proxy = RPC.getProtocolProxy( + TestProtocol1.class, TestProtocol1.versionID, addr, conf); + + TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy(); + proxy1.ping(); + try { + proxy1.echo("hello"); + fail("Echo should fail"); + } catch(IOException e) { + } + } + + private class Version2Client { + + private TestProtocol2 proxy2; + private ProtocolProxy serverInfo; + + private Version2Client() throws IOException { + serverInfo = RPC.getProtocolProxy( + TestProtocol2.class, TestProtocol2.versionID, addr, conf); + proxy2 = serverInfo.getProxy(); + } + + public int echo(int value) throws IOException, NumberFormatException { + if (serverInfo.isMethodSupported("echo", int.class)) { +System.out.println("echo int is supported"); + return -value; // use version 3 echo long + } else { // server is version 2 +System.out.println("echo int is NOT supported"); + return Integer.parseInt(proxy2.echo(String.valueOf(value))); + } + } + + public String echo(String value) throws IOException { + return proxy2.echo(value); + } + + public void ping() throws IOException { + proxy2.ping(); + } + } + + @Test // Compatible new client & old server + public void testVersion2ClientVersion1Server() throws Exception { + // create a server with two handlers + TestImpl1 impl = new TestImpl1(); + server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + addr = NetUtils.getConnectAddress(server); + + + Version2Client client = new Version2Client(); + client.ping(); + assertEquals("hello", client.echo("hello")); + + // echo(int) is not supported by server, so returning 3 + // This verifies that echo(int) and echo(String)'s hash codes are different + assertEquals(3, client.echo(3)); + } + + @Test // equal version client and server + public void testVersion2ClientVersion2Server() throws Exception { + // create a server with two handlers + TestImpl2 impl = new TestImpl2(); + server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + addr = NetUtils.getConnectAddress(server); + + Version2Client client = new Version2Client(); + + client.ping(); + assertEquals("hello", client.echo("hello")); + + // now that echo(int) is supported by the server, echo(int) should return -3 + assertEquals(-3, client.echo(3)); + } + public interface TestProtocol3 { int echo(String value); int echo(int value); @@ -194,4 +297,97 @@ public interface TestProtocol4 extends TestProtocol2 { @Override int echo(int value) throws IOException; } + + @Test + public void testVersionMismatch() throws IOException { + server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) + .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class, + TestProtocol4.versionID, addr, conf); + try { + proxy.echo(21); + fail("The call must throw VersionMismatch exception"); + } catch (RemoteException ex) { + Assert.assertEquals(RPC.VersionMismatch.class.getName(), + ex.getClassName()); + Assert.assertTrue(ex.getErrorCode().equals( + RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH)); + } catch (IOException ex) { + fail("Expected version mismatch but got " + ex); + } + } + + @Test + public void testIsMethodSupported() throws IOException { + server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) + .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, + TestProtocol2.versionID, addr, conf); + boolean supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertTrue(supported); + supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertFalse(supported); + } + + /** + * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up + * the server registry to extract protocol signatures and versions. + */ + @Test + public void testProtocolMetaInfoSSTranslatorPB() throws Exception { + TestImpl1 impl = new TestImpl1(); + server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(server); + + GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RPC.RpcKind.RPC_PROTOCOL_BUFFER)); + //No signatures should be found + Assert.assertEquals(0, resp.getProtocolSignatureCount()); + resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RPC.RpcKind.RPC_WRITABLE)); + Assert.assertEquals(1, resp.getProtocolSignatureCount()); + ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); + Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); + boolean found = false; + int expected = ProtocolSignature.getFingerprint(TestProtocol1.class + .getMethod("echo", String.class)); + for (int m : sig.getMethodsList()) { + if (expected == m) { + found = true; + break; + } + } + Assert.assertTrue(found); + } + + private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( + Class protocol, RPC.RpcKind rpcKind) { + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + return builder.build(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java index b22f91b8e8..5807998a15 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; +import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -28,13 +30,11 @@ import java.net.InetSocketAddress; import java.nio.channels.ClosedByInterruptException; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; - /** * tests that the proxy can be interrupted */ -public class TestRPCWaitForProxy extends TestRpcBase { +public class TestRPCWaitForProxy extends Assert { + private static final String ADDRESS = "0.0.0.0"; private static final Logger LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class); @@ -46,15 +46,14 @@ public class TestRPCWaitForProxy extends TestRpcBase { * * @throws Throwable any exception other than that which was expected */ - @Test(timeout = 50000) + @Test(timeout = 10000) public void testWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(0); worker.start(); worker.join(); Throwable caught = worker.getCaught(); - Throwable cause = caught.getCause(); - Assert.assertNotNull("No exception was raised", cause); - if (!(cause instanceof ConnectException)) { + assertNotNull("No exception was raised", caught); + if (!(caught instanceof ConnectException)) { throw caught; } } @@ -70,11 +69,11 @@ public void testInterruptedWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(100); worker.start(); Thread.sleep(1000); - Assert.assertTrue("worker hasn't started", worker.waitStarted); + assertTrue("worker hasn't started", worker.waitStarted); worker.interrupt(); worker.join(); Throwable caught = worker.getCaught(); - Assert.assertNotNull("No exception was raised", caught); + assertNotNull("No exception was raised", caught); // looking for the root cause here, which can be wrapped // as part of the NetUtils work. Having this test look // a the type of exception there would be brittle to improvements @@ -83,8 +82,6 @@ public void testInterruptedWaitForProxy() throws Throwable { if (cause == null) { // no inner cause, use outer exception as root cause. cause = caught; - } else if (cause.getCause() != null) { - cause = cause.getCause(); } if (!(cause instanceof InterruptedIOException) && !(cause instanceof ClosedByInterruptException)) { @@ -115,16 +112,12 @@ public void run() { IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, connectRetries); waitStarted = true; - - short invalidPort = 20; - InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, - invalidPort); - TestRpcBase.TestRpcService proxy = RPC.getProxy( - TestRpcBase.TestRpcService.class, - 1L, invalidAddress, conf); - // Test echo method - proxy.echo(null, newEchoRequest("hello")); - + TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, + TestProtocol.versionID, + new InetSocketAddress(ADDRESS, 20), + config, + 15000L); + proxy.echo(""); } catch (Throwable throwable) { caught = throwable; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index 5a8f8d0124..bc604a47ef 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -112,8 +112,7 @@ protected static RPC.Server setupTestServer(Configuration serverConf, return setupTestServer(builder); } - protected static RPC.Server setupTestServer( - RPC.Builder builder) throws IOException { + protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException { RPC.Server server = builder.build(); server.start(); @@ -176,21 +175,17 @@ public static class TestTokenIdentifier extends TokenIdentifier { public TestTokenIdentifier() { this(new Text(), new Text()); } - public TestTokenIdentifier(Text tokenid) { this(tokenid, new Text()); } - public TestTokenIdentifier(Text tokenid, Text realUser) { this.tokenid = tokenid == null ? new Text() : tokenid; this.realUser = realUser == null ? new Text() : realUser; } - @Override public Text getKind() { return KIND_NAME; } - @Override public UserGroupInformation getUser() { if (realUser.toString().isEmpty()) { @@ -208,7 +203,6 @@ public void readFields(DataInput in) throws IOException { tokenid.readFields(in); realUser.readFields(in); } - @Override public void write(DataOutput out) throws IOException { tokenid.write(out); @@ -240,7 +234,7 @@ public static class TestTokenSelector implements @SuppressWarnings("unchecked") @Override public Token selectToken(Text service, - Collection> tokens) { + Collection> tokens) { if (service == null) { return null; } @@ -394,17 +388,19 @@ public TestProtos.AuthMethodResponseProto getAuthMethod( } @Override - public TestProtos.UserResponseProto getAuthUser( + public TestProtos.AuthUserResponseProto getAuthUser( RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException { - UserGroupInformation authUser; + UserGroupInformation authUser = null; try { authUser = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new ServiceException(e); } - return newUserResponse(authUser.getUserName()); + return TestProtos.AuthUserResponseProto.newBuilder() + .setAuthUser(authUser.getUserName()) + .build(); } @Override @@ -436,34 +432,6 @@ public TestProtos.EmptyResponseProto sendPostponed( return TestProtos.EmptyResponseProto.newBuilder().build(); } - - @Override - public TestProtos.UserResponseProto getCurrentUser( - RpcController controller, - TestProtos.EmptyRequestProto request) throws ServiceException { - String user; - try { - user = UserGroupInformation.getCurrentUser().toString(); - } catch (IOException e) { - throw new ServiceException("Failed to get current user", e); - } - - return newUserResponse(user); - } - - @Override - public TestProtos.UserResponseProto getServerRemoteUser( - RpcController controller, - TestProtos.EmptyRequestProto request) throws ServiceException { - String serverRemoteUser = Server.getRemoteUser().toString(); - return newUserResponse(serverRemoteUser); - } - - private TestProtos.UserResponseProto newUserResponse(String user) { - return TestProtos.UserResponseProto.newBuilder() - .setUser(user) - .build(); - } } protected static TestProtos.EmptyRequestProto newEmptyRequest() { @@ -510,4 +478,8 @@ protected static AuthMethod convert( } return null; } + + protected static String convert(TestProtos.AuthUserResponseProto response) { + return response.getAuthUser(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index c48ff2e186..72371a7ae9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -45,55 +45,30 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import javax.security.auth.callback.*; +import javax.security.sasl.*; import java.io.IOException; import java.lang.annotation.Annotation; import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.security.Security; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*; +import static org.junit.Assert.*; /** Unit tests for using Sasl over RPC. */ @RunWith(Parameterized.class) public class TestSaslRPC extends TestRpcBase { @Parameters public static Collection data() { - Collection params = new ArrayList<>(); + Collection params = new ArrayList(); for (QualityOfProtection qop : QualityOfProtection.values()) { params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null }); } @@ -139,7 +114,7 @@ enum UseToken { NONE(), VALID(), INVALID(), - OTHER() + OTHER(); } @BeforeClass @@ -255,7 +230,7 @@ public void testDigestRpcWithoutAnnotation() throws Exception { final Server server = setupTestServer(conf, 5, sm); doDigestRpc(server, sm); } finally { - SecurityUtil.setSecurityInfoProviders(); + SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); } } @@ -284,7 +259,7 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm) addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName())); - Token token = new Token<>(tokenId, sm); + Token token = new Token(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); @@ -321,8 +296,8 @@ public void testPingInterval() throws Exception { // set doPing to true newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); - ConnectionId remoteId = ConnectionId.getConnectionId( - new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf); + ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0), + TestRpcService.class, null, 0, null, newConf); assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, remoteId.getPingInterval()); // set doPing to false @@ -831,13 +806,13 @@ private String internalGetAuthMethod( final TestTokenSecretManager sm = new TestTokenSecretManager(); boolean useSecretManager = (serverAuth != SIMPLE); if (enableSecretManager != null) { - useSecretManager &= enableSecretManager; + useSecretManager &= enableSecretManager.booleanValue(); } if (forceSecretManager != null) { - useSecretManager |= forceSecretManager; + useSecretManager |= forceSecretManager.booleanValue(); } final SecretManager serverSm = useSecretManager ? sm : null; - + Server server = serverUgi.doAs(new PrivilegedExceptionAction() { @Override public Server run() throws IOException { @@ -892,13 +867,13 @@ public String run() throws IOException { proxy.ping(null, newEmptyRequest()); // make sure the other side thinks we are who we said we are!!! assertEquals(clientUgi.getUserName(), - proxy.getAuthUser(null, newEmptyRequest()).getUser()); + convert(proxy.getAuthUser(null, newEmptyRequest()))); AuthMethod authMethod = convert(proxy.getAuthMethod(null, newEmptyRequest())); // verify sasl completed with correct QOP assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null, - RPC.getConnectionIdForProxy(proxy).getSaslQop()); - return authMethod != null ? authMethod.toString() : null; + RPC.getConnectionIdForProxy(proxy).getSaslQop()); + return authMethod.toString(); } catch (ServiceException se) { if (se.getCause() instanceof RemoteException) { throw (RemoteException) se.getCause(); @@ -923,18 +898,21 @@ private static void assertAuthEquals(AuthMethod expect, String actual) { assertEquals(expect.toString(), actual); } - - private static void assertAuthEquals(Pattern expect, String actual) { + + private static void assertAuthEquals(Pattern expect, + String actual) { // this allows us to see the regexp and the value it didn't match if (!expect.matcher(actual).matches()) { - fail(); // it failed + assertEquals(expect, actual); // it failed + } else { + assertTrue(true); // it matched } } /* * Class used to test overriding QOP values using SaslPropertiesResolver */ - static class AuthSaslPropertiesResolver extends SaslPropertiesResolver { + static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{ @Override public Map getServerProperties(InetAddress address) { @@ -943,7 +921,7 @@ public Map getServerProperties(InetAddress address) { return newPropertes; } } - + public static void main(String[] args) throws Exception { System.out.println("Testing Kerberos authentication over RPC"); if (args.length != 2) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index c4dbcac4c2..50d389c646 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -17,35 +17,40 @@ */ package org.apache.hadoop.security; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.TestRpcBase; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.token.Token; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Enumeration; +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenInfo; +import org.junit.Before; +import org.junit.Test; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector; +import org.apache.commons.logging.*; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; + /** - * Test do as effective user. + * */ -public class TestDoAsEffectiveUser extends TestRpcBase { +public class TestDoAsEffectiveUser { final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG"; final private static String REAL_USER_SHORT_NAME = "realUser1"; final private static String PROXY_USER_NAME = "proxyUser"; @@ -53,8 +58,8 @@ public class TestDoAsEffectiveUser extends TestRpcBase { final private static String GROUP2_NAME = "group2"; final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME, GROUP2_NAME }; - - private TestRpcService client; + private static final String ADDRESS = "0.0.0.0"; + private TestProtocol proxy; private static final Configuration masterConf = new Configuration(); @@ -77,7 +82,7 @@ public void setMasterConf() throws IOException { private void configureSuperUserIPAddresses(Configuration conf, String superUserShortName) throws IOException { - ArrayList ipList = new ArrayList<>(); + ArrayList ipList = new ArrayList(); Enumeration netInterfaceList = NetworkInterface .getNetworkInterfaces(); while (netInterfaceList.hasMoreElements()) { @@ -125,19 +130,50 @@ public UserGroupInformation run() throws IOException { curUGI.toString()); } - private void checkRemoteUgi(final UserGroupInformation ugi, - final Configuration conf) throws Exception { + @TokenInfo(TestTokenSelector.class) + public interface TestProtocol extends VersionedProtocol { + public static final long versionID = 1L; + + String aMethod() throws IOException; + String getServerRemoteUser() throws IOException; + } + + public class TestImpl implements TestProtocol { + + @Override + public String aMethod() throws IOException { + return UserGroupInformation.getCurrentUser().toString(); + } + + @Override + public String getServerRemoteUser() throws IOException { + return Server.getRemoteUser().toString(); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return TestProtocol.versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return new ProtocolSignature(TestProtocol.versionID, null); + } + } + + private void checkRemoteUgi(final Server server, + final UserGroupInformation ugi, final Configuration conf) + throws Exception { ugi.doAs(new PrivilegedExceptionAction() { @Override - public Void run() throws ServiceException { - client = getClient(addr, conf); - String currentUser = client.getCurrentUser(null, - newEmptyRequest()).getUser(); - String serverRemoteUser = client.getServerRemoteUser(null, - newEmptyRequest()).getUser(); - - Assert.assertEquals(ugi.toString(), currentUser); - Assert.assertEquals(ugi.toString(), serverRemoteUser); + public Void run() throws IOException { + proxy = RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + Assert.assertEquals(ugi.toString(), proxy.aMethod()); + Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser()); return null; } }); @@ -149,27 +185,29 @@ public void testRealUserSetup() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - // Set RPC engine to protobuf RPC engine - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(5).setVerbose(true).build(); refreshConf(conf); try { + server.start(); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(realUserUgi, conf); + checkRemoteUgi(server, realUserUgi, conf); - UserGroupInformation proxyUserUgi = - UserGroupInformation.createProxyUserForTesting( + UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting( PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(proxyUserUgi, conf); + checkRemoteUgi(server, proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -180,25 +218,29 @@ public void testRealUserAuthorizationSuccess() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(realUserUgi, conf); + checkRemoteUgi(server, realUserUgi, conf); UserGroupInformation proxyUserUgi = UserGroupInformation .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(proxyUserUgi, conf); + checkRemoteUgi(server, proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -214,14 +256,17 @@ public void testRealUserIPAuthorizationFailure() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -230,10 +275,11 @@ public void testRealUserIPAuthorizationFailure() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -241,7 +287,10 @@ public String run() throws ServiceException { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -250,14 +299,17 @@ public void testRealUserIPNotSpecified() throws IOException { final Configuration conf = new Configuration(); conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 2); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -266,10 +318,11 @@ public void testRealUserIPNotSpecified() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -277,7 +330,10 @@ public String run() throws ServiceException { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -285,12 +341,15 @@ public String run() throws ServiceException { public void testRealUserGroupNotSpecified() throws IOException { final Configuration conf = new Configuration(); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 2); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -299,10 +358,11 @@ public void testRealUserGroupNotSpecified() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -310,7 +370,10 @@ public String run() throws ServiceException { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -321,14 +384,17 @@ public void testRealUserGroupAuthorizationFailure() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group3"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 2); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -337,10 +403,11 @@ public void testRealUserGroupAuthorizationFailure() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -348,7 +415,10 @@ public String run() throws ServiceException { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -362,17 +432,20 @@ public void testProxyWithToken() throws Exception { final Configuration conf = new Configuration(masterConf); TestTokenSecretManager sm = new TestTokenSecretManager(); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5, sm); + final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build(); + + server.start(); final UserGroupInformation current = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token<>(tokenId, + Token token = new Token(tokenId, sm); SecurityUtil.setTokenService(token, addr); UserGroupInformation proxyUserUgi = UserGroupInformation @@ -380,19 +453,23 @@ public void testProxyWithToken() throws Exception { proxyUserUgi.addToken(token); refreshConf(conf); - + String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } catch (Exception e) { e.printStackTrace(); throw e; } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } }); @@ -409,34 +486,42 @@ public void testTokenBySuperUser() throws Exception { TestTokenSecretManager sm = new TestTokenSecretManager(); final Configuration newConf = new Configuration(masterConf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf); - // Set RPC engine to protobuf RPC engine - RPC.setProtocolEngine(newConf, TestRpcService.class, - ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(newConf); - final Server server = setupTestServer(newConf, 5, sm); + final Server server = new RPC.Builder(newConf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .setSecretManager(sm).build(); + + server.start(); final UserGroupInformation current = UserGroupInformation .createUserForTesting(REAL_USER_NAME, GROUP_NAMES); refreshConf(newConf); - + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token<>(tokenId, sm); + Token token = new Token(tokenId, + sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); String retVal = current.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - client = getClient(addr, newConf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, newConf); + String ret = proxy.aMethod(); + return ret; } catch (Exception e) { e.printStackTrace(); throw e; } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } }); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java index e45d70db74..b3ea5f23e3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -29,11 +28,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; @@ -55,22 +50,9 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; -import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; -import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; -import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.apache.hadoop.ipc.TestSaslRPC.*; +import static org.apache.hadoop.test.MetricsAsserts.*; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,7 +109,7 @@ public void resetUgi() { UserGroupInformation.setLoginUser(null); } - @Test(timeout = 30000) + @Test (timeout = 30000) public void testSimpleLogin() throws IOException { tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index 6411f97ab6..99cd93d711 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -88,6 +88,6 @@ message AuthMethodResponseProto { required string mechanismName = 2; } -message UserResponseProto { - required string user = 1; +message AuthUserResponseProto { + required string authUser = 1; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 06f6c4fc1d..3292115885 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -40,11 +40,9 @@ service TestProtobufRpcProto { rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); rpc sleep(SleepRequestProto) returns (EmptyResponseProto); rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto); - rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto); + rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto); rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto); rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto); - rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto); - rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto); } service TestProtobufRpc2Proto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 57f7cb197b..6b52949868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -168,6 +168,7 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; @@ -316,6 +317,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); + + WritableRpcEngine.ensureInitialized(); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java new file mode 100644 index 0000000000..0b7ee337d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.security; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.mockito.Mockito.mock; + +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslRpcClient; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Test; + +/** Unit tests for using Delegation Token over RPC. */ +public class TestClientProtocolWithDelegationToken { + private static final String ADDRESS = "0.0.0.0"; + + public static final Log LOG = LogFactory + .getLog(TestClientProtocolWithDelegationToken.class); + + private static final Configuration conf; + static { + conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + static { + GenericTestUtils.setLogLevel(Client.LOG, Level.ALL); + GenericTestUtils.setLogLevel(Server.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL); + } + + @Test + public void testDelegationTokenRpc() throws Exception { + ClientProtocol mockNN = mock(ClientProtocol.class); + FSNamesystem mockNameSys = mock(FSNamesystem.class); + + DelegationTokenSecretManager sm = new DelegationTokenSecretManager( + DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, + 3600000, mockNameSys); + sm.startThreads(); + final Server server = new RPC.Builder(conf) + .setProtocol(ClientProtocol.class).setInstance(mockNN) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .setSecretManager(sm).build(); + + server.start(); + + final UserGroupInformation current = UserGroupInformation.getCurrentUser(); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + String user = current.getUserName(); + Text owner = new Text(user); + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null); + Token token = new Token( + dtId, sm); + SecurityUtil.setTokenService(token, addr); + LOG.info("Service for token is " + token.getService()); + current.addToken(token); + current.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + ClientProtocol proxy = null; + try { + proxy = RPC.getProxy(ClientProtocol.class, + ClientProtocol.versionID, addr, conf); + proxy.getServerDefaults(); + } finally { + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + return null; + } + }); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index 729af0a951..3fef5e278b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -97,6 +98,8 @@ public void serviceInit(Configuration conf) throws Exception { BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService .newReflectiveBlockingService(refreshHSAdminProtocolXlator); + WritableRpcEngine.ensureInitialized(); + clientRpcAddress = conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.JHS_ADMIN_ADDRESS,