diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 0f43fc6d3d..7c11e22e57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -67,7 +67,7 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ThreadLocal> ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); - static { // Register the rpcRequest deserializer for WritableRpcEngine + static { // Register the rpcRequest deserializer for ProtobufRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, new Server.ProtoBufRpcInvoker()); @@ -201,7 +201,8 @@ public Object invoke(Object proxy, final Method method, Object[] args) } if (args.length != 2) { // RpcController + Message - throw new ServiceException("Too many parameters for request. Method: [" + throw new ServiceException( + "Too many or few parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 3f68d6334c..a544f2fc66 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; @@ -26,7 +28,6 @@ import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; -import java.io.*; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -37,11 +38,12 @@ import javax.net.SocketFactory; -import org.apache.commons.logging.*; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; @@ -54,7 +56,6 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.*; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -84,10 +85,10 @@ public class RPC { final static int RPC_SERVICE_CLASS_DEFAULT = 0; public enum RpcKind { RPC_BUILTIN ((short) 1), // Used for built in calls by tests - RPC_WRITABLE ((short) 2), // Use WritableRpcEngine + // 2 for WritableRpcEngine, obsolete and removed RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - public final short value; //TODO make it private + private final short value; RpcKind(short val) { this.value = val; @@ -207,7 +208,7 @@ static synchronized RpcEngine getProtocolEngine(Class protocol, RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), - WritableRpcEngine.class); + ProtobufRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } @@ -949,10 +950,10 @@ VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, return new VerProtocolImpl(highestVersion, highest); } - protected Server(String bindAddress, int port, + protected Server(String bindAddress, int port, Class paramClass, int handlerCount, int numReaders, int queueSizePerHandler, - Configuration conf, String serverName, + Configuration conf, String serverName, SecretManager secretManager, String portRangeConfig) throws IOException { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 88c1f3c826..be46e765b1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -243,14 +243,14 @@ private static Set addExceptions( static class RpcKindMapValue { final Class rpcRequestWrapperClass; final RpcInvoker rpcInvoker; + RpcKindMapValue (Class rpcRequestWrapperClass, RpcInvoker rpcInvoker) { this.rpcInvoker = rpcInvoker; this.rpcRequestWrapperClass = rpcRequestWrapperClass; } } - static Map rpcKindMap = new - HashMap(4); + static Map rpcKindMap = new HashMap<>(4); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java deleted file mode 100644 index a9dbb41fd9..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ /dev/null @@ -1,564 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import java.lang.reflect.Proxy; -import java.lang.reflect.Method; -import java.lang.reflect.InvocationTargetException; - -import java.net.InetSocketAddress; -import java.io.*; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.SocketFactory; - -import org.apache.commons.logging.*; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.*; -import org.apache.htrace.core.TraceScope; -import org.apache.htrace.core.Tracer; - -/** An RpcEngine implementation for Writable data. */ -@InterfaceStability.Evolving -public class WritableRpcEngine implements RpcEngine { - private static final Log LOG = LogFactory.getLog(RPC.class); - - //writableRpcVersion should be updated if there is a change - //in format of the rpc messages. - - // 2L - added declared class to Invocation - public static final long writableRpcVersion = 2L; - - /** - * Whether or not this class has been initialized. - */ - private static boolean isInitialized = false; - - static { - ensureInitialized(); - } - - /** - * Initialize this class if it isn't already. - */ - public static synchronized void ensureInitialized() { - if (!isInitialized) { - initialize(); - } - } - - /** - * Register the rpcRequest deserializer for WritableRpcEngine - */ - private static synchronized void initialize() { - org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE, - Invocation.class, new Server.WritableRpcInvoker()); - isInitialized = true; - } - - - /** A method invocation, including the method name and its parameters.*/ - private static class Invocation implements Writable, Configurable { - private String methodName; - private Class[] parameterClasses; - private Object[] parameters; - private Configuration conf; - private long clientVersion; - private int clientMethodsHash; - private String declaringClassProtocolName; - - //This could be different from static writableRpcVersion when received - //at server, if client is using a different version. - private long rpcVersion; - - @SuppressWarnings("unused") // called when deserializing an invocation - public Invocation() {} - - public Invocation(Method method, Object[] parameters) { - this.methodName = method.getName(); - this.parameterClasses = method.getParameterTypes(); - this.parameters = parameters; - rpcVersion = writableRpcVersion; - if (method.getDeclaringClass().equals(VersionedProtocol.class)) { - //VersionedProtocol is exempted from version check. - clientVersion = 0; - clientMethodsHash = 0; - } else { - this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass()); - this.clientMethodsHash = ProtocolSignature.getFingerprint(method - .getDeclaringClass().getMethods()); - } - this.declaringClassProtocolName = - RPC.getProtocolName(method.getDeclaringClass()); - } - - /** The name of the method invoked. */ - public String getMethodName() { return methodName; } - - /** The parameter classes. */ - public Class[] getParameterClasses() { return parameterClasses; } - - /** The parameter instances. */ - public Object[] getParameters() { return parameters; } - - private long getProtocolVersion() { - return clientVersion; - } - - @SuppressWarnings("unused") - private int getClientMethodsHash() { - return clientMethodsHash; - } - - /** - * Returns the rpc version used by the client. - * @return rpcVersion - */ - public long getRpcVersion() { - return rpcVersion; - } - - @Override - @SuppressWarnings("deprecation") - public void readFields(DataInput in) throws IOException { - rpcVersion = in.readLong(); - declaringClassProtocolName = UTF8.readString(in); - methodName = UTF8.readString(in); - clientVersion = in.readLong(); - clientMethodsHash = in.readInt(); - parameters = new Object[in.readInt()]; - parameterClasses = new Class[parameters.length]; - ObjectWritable objectWritable = new ObjectWritable(); - for (int i = 0; i < parameters.length; i++) { - parameters[i] = - ObjectWritable.readObject(in, objectWritable, this.conf); - parameterClasses[i] = objectWritable.getDeclaredClass(); - } - } - - @Override - @SuppressWarnings("deprecation") - public void write(DataOutput out) throws IOException { - out.writeLong(rpcVersion); - UTF8.writeString(out, declaringClassProtocolName); - UTF8.writeString(out, methodName); - out.writeLong(clientVersion); - out.writeInt(clientMethodsHash); - out.writeInt(parameterClasses.length); - for (int i = 0; i < parameterClasses.length; i++) { - ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], - conf, true); - } - } - - @Override - public String toString() { - StringBuilder buffer = new StringBuilder(); - buffer.append(methodName); - buffer.append("("); - for (int i = 0; i < parameters.length; i++) { - if (i != 0) - buffer.append(", "); - buffer.append(parameters[i]); - } - buffer.append(")"); - buffer.append(", rpc version="+rpcVersion); - buffer.append(", client version="+clientVersion); - buffer.append(", methodsFingerPrint="+clientMethodsHash); - return buffer.toString(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return this.conf; - } - - } - - private static ClientCache CLIENTS=new ClientCache(); - - private static class Invoker implements RpcInvocationHandler { - private Client.ConnectionId remoteId; - private Client client; - private boolean isClosed = false; - private final AtomicBoolean fallbackToSimpleAuth; - - public Invoker(Class protocol, - InetSocketAddress address, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) - throws IOException { - this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, - ticket, rpcTimeout, null, conf); - this.client = CLIENTS.getClient(conf, factory); - this.fallbackToSimpleAuth = fallbackToSimpleAuth; - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - long startTime = 0; - if (LOG.isDebugEnabled()) { - startTime = Time.now(); - } - - // if Tracing is on then start a new span for this rpc. - // guard it in the if statement to make sure there isn't - // any extra string manipulation. - Tracer tracer = Tracer.curThreadTracer(); - TraceScope traceScope = null; - if (tracer != null) { - traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method)); - } - ObjectWritable value; - try { - value = (ObjectWritable) - client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), - remoteId, fallbackToSimpleAuth); - } finally { - if (traceScope != null) traceScope.close(); - } - if (LOG.isDebugEnabled()) { - long callTime = Time.now() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); - } - return value.get(); - } - - /* close the IPC client that's responsible for this invoker's RPCs */ - @Override - synchronized public void close() { - if (!isClosed) { - isClosed = true; - CLIENTS.stopClient(client); - } - } - - @Override - public ConnectionId getConnectionId() { - return remoteId; - } - } - - // for unit testing only - @InterfaceAudience.Private - @InterfaceStability.Unstable - static Client getClient(Configuration conf) { - return CLIENTS.getClient(conf); - } - - /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * @param */ - @Override - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, RetryPolicy connectionRetryPolicy) - throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null); - } - - /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * @param */ - @Override - @SuppressWarnings("unchecked") - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) - throws IOException { - - if (connectionRetryPolicy != null) { - throw new UnsupportedOperationException( - "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); - } - - T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), - new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, - factory, rpcTimeout, fallbackToSimpleAuth)); - return new ProtocolProxy(protocol, proxy, true); - } - - /* Construct a server for a protocol implementation instance listening on a - * port and address. */ - @Override - public RPC.Server getServer(Class protocolClass, - Object protocolImpl, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, Configuration conf, - SecretManager secretManager, - String portRangeConfig) - throws IOException { - return new Server(protocolClass, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, - portRangeConfig); - } - - - /** An RPC Server. */ - public static class Server extends RPC.Server { - /** - * Construct an RPC server. - * @param instance the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * - * @deprecated Use #Server(Class, Object, Configuration, String, int) - */ - @Deprecated - public Server(Object instance, Configuration conf, String bindAddress, - int port) throws IOException { - this(null, instance, conf, bindAddress, port); - } - - - /** Construct an RPC server. - * @param protocolClass class - * @param protocolImpl the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - */ - public Server(Class protocolClass, Object protocolImpl, - Configuration conf, String bindAddress, int port) - throws IOException { - this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, - false, null, null); - } - - /** - * Construct an RPC server. - * @param protocolImpl the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - * - * @deprecated use Server#Server(Class, Object, - * Configuration, String, int, int, int, int, boolean, SecretManager) - */ - @Deprecated - public Server(Object protocolImpl, Configuration conf, String bindAddress, - int port, int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, SecretManager secretManager) - throws IOException { - this(null, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, - secretManager, null); - - } - - /** - * Construct an RPC server. - * @param protocolClass - the protocol being registered - * can be null for compatibility with old usage (see below for details) - * @param protocolImpl the protocol impl that will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - */ - public Server(Class protocolClass, Object protocolImpl, - Configuration conf, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, SecretManager secretManager, - String portRangeConfig) - throws IOException { - super(bindAddress, port, null, numHandlers, numReaders, - queueSizePerHandler, conf, - classNameBase(protocolImpl.getClass().getName()), secretManager, - portRangeConfig); - - this.verbose = verbose; - - - Class[] protocols; - if (protocolClass == null) { // derive protocol from impl - /* - * In order to remain compatible with the old usage where a single - * target protocolImpl is suppled for all protocol interfaces, and - * the protocolImpl is derived from the protocolClass(es) - * we register all interfaces extended by the protocolImpl - */ - protocols = RPC.getProtocolInterfaces(protocolImpl.getClass()); - - } else { - if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) { - throw new IOException("protocolClass "+ protocolClass + - " is not implemented by protocolImpl which is of class " + - protocolImpl.getClass()); - } - // register protocol class and its super interfaces - registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); - protocols = RPC.getProtocolInterfaces(protocolClass); - } - for (Class p : protocols) { - if (!p.equals(VersionedProtocol.class)) { - registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl); - } - } - - } - - private static void log(String value) { - if (value!= null && value.length() > 55) - value = value.substring(0, 55)+"..."; - LOG.info(value); - } - - static class WritableRpcInvoker implements RpcInvoker { - - @Override - public Writable call(org.apache.hadoop.ipc.RPC.Server server, - String protocolName, Writable rpcRequest, long receivedTime) - throws IOException, RPC.VersionMismatch { - - Invocation call = (Invocation)rpcRequest; - if (server.verbose) log("Call: " + call); - - // Verify writable rpc version - if (call.getRpcVersion() != writableRpcVersion) { - // Client is using a different version of WritableRpc - throw new RpcServerException( - "WritableRpc version mismatch, client side version=" - + call.getRpcVersion() + ", server side version=" - + writableRpcVersion); - } - - long clientVersion = call.getProtocolVersion(); - final String protoName; - ProtoClassProtoImpl protocolImpl; - if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) { - // VersionProtocol methods are often used by client to figure out - // which version of protocol to use. - // - // Versioned protocol methods should go the protocolName protocol - // rather than the declaring class of the method since the - // the declaring class is VersionedProtocol which is not - // registered directly. - // Send the call to the highest protocol version - VerProtocolImpl highest = server.getHighestSupportedProtocol( - RPC.RpcKind.RPC_WRITABLE, protocolName); - if (highest == null) { - throw new RpcServerException("Unknown protocol: " + protocolName); - } - protocolImpl = highest.protocolTarget; - } else { - protoName = call.declaringClassProtocolName; - - // Find the right impl for the protocol based on client version. - ProtoNameVer pv = - new ProtoNameVer(call.declaringClassProtocolName, clientVersion); - protocolImpl = - server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv); - if (protocolImpl == null) { // no match for Protocol AND Version - VerProtocolImpl highest = - server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, - protoName); - if (highest == null) { - throw new RpcServerException("Unknown protocol: " + protoName); - } else { // protocol supported but not the version that client wants - throw new RPC.VersionMismatch(protoName, clientVersion, - highest.version); - } - } - } - - // Invoke the protocol method - long startTime = Time.now(); - int qTime = (int) (startTime-receivedTime); - Exception exception = null; - try { - Method method = - protocolImpl.protocolClass.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - server.rpcDetailedMetrics.init(protocolImpl.protocolClass); - Object value = - method.invoke(protocolImpl.protocolImpl, call.getParameters()); - if (server.verbose) log("Return: "+value); - return new ObjectWritable(method.getReturnType(), value); - - } catch (InvocationTargetException e) { - Throwable target = e.getTargetException(); - if (target instanceof IOException) { - exception = (IOException)target; - throw (IOException)target; - } else { - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); - exception = ioe; - throw ioe; - } - } catch (Throwable e) { - if (!(e instanceof IOException)) { - LOG.error("Unexpected throwable object ", e); - } - IOException ioe = new IOException(e.toString()); - ioe.setStackTrace(e.getStackTrace()); - exception = ioe; - throw ioe; - } finally { - int processingTime = (int) (Time.now() - startTime); - if (LOG.isDebugEnabled()) { - String msg = "Served: " + call.getMethodName() + - " queueTime= " + qTime + " procesingTime= " + processingTime; - if (exception != null) { - msg += " exception= " + exception.getClass().getSimpleName(); - } - LOG.debug(msg); - } - String detailedMetricsName = (exception == null) ? - call.getMethodName() : - exception.getClass().getSimpleName(); - server.updateMetrics(detailedMetricsName, qTime, processingTime); - } - } - } - } - - @Override - public ProtocolProxy getProtocolMetaInfoProxy( - ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - throw new UnsupportedOperationException("This proxy is not supported"); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index 798aa01f29..aa334f3481 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -689,7 +689,7 @@ public static UserGroupInformation getBestUGI( * * @param user The principal name to load from the ticket * cache - * @param ticketCachePath the path to the ticket cache file + * @param ticketCache the path to the ticket cache file * * @throws IOException if the kerberos login fails */ @@ -749,7 +749,7 @@ public static UserGroupInformation getUGIFromTicketCache( /** * Create a UserGroupInformation from a Subject with Kerberos principal. * - * @param user The KerberosPrincipal to use in UGI + * @param subject The KerberosPrincipal to use in UGI * * @throws IOException if the kerberos login fails */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 1a5acbab6e..04e14e8bd4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -146,7 +146,6 @@ public static UserGroupInformation getUgi(UserInformationProto userInfo) { static RpcKindProto convert(RPC.RpcKind kind) { switch (kind) { case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN; - case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE; case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER; } return null; @@ -156,7 +155,6 @@ static RpcKindProto convert(RPC.RpcKind kind) { public static RPC.RpcKind convert( RpcKindProto kind) { switch (kind) { case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN; - case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE; case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER; } return null; diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index aa14616289..f1a36aef74 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -44,10 +44,10 @@ package hadoop.common; /** * RpcKind determine the rpcEngine and the serialization of the rpc request + * Note: 1 for RPC_WRITABLE, WritableRpcEngine, obsolete and removed */ enum RpcKindProto { RPC_BUILTIN = 0; // Used for built in calls by tests - RPC_WRITABLE = 1; // Use WritableRpcEngine RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java index eb7b949709..9356dabe2f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java @@ -17,13 +17,8 @@ */ package org.apache.hadoop.ipc; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.atomic.AtomicLong; - +import com.google.common.base.Joiner; +import com.google.protobuf.BlockingService; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -34,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.RPC.Server; -import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; @@ -45,8 +39,12 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.base.Joiner; -import com.google.protobuf.BlockingService; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicLong; /** * Benchmark for protobuf RPC. @@ -68,7 +66,7 @@ private static class MyOptions { public int secondsToRun = 15; private int msgSize = 1024; public Class rpcEngine = - WritableRpcEngine.class; + ProtobufRpcEngine.class; private MyOptions(String args[]) { try { @@ -135,7 +133,7 @@ private Options buildOptions() { opts.addOption( OptionBuilder.withLongOpt("engine").hasArg(true) - .withArgName("writable|protobuf") + .withArgName("protobuf") .withDescription("engine to use") .create('e')); @@ -184,8 +182,6 @@ private void processOptions(CommandLine line, Options opts) String eng = line.getOptionValue('e'); if ("protobuf".equals(eng)) { rpcEngine = ProtobufRpcEngine.class; - } else if ("writable".equals(eng)) { - rpcEngine = WritableRpcEngine.class; } else { throw new ParseException("invalid engine: " + eng); } @@ -237,11 +233,6 @@ private Server startServer(MyOptions opts) throws IOException { server = new RPC.Builder(conf).setProtocol(TestRpcService.class) .setInstance(service).setBindAddress(opts.host).setPort(opts.getPort()) .setNumHandlers(opts.serverThreads).setVerbose(false).build(); - } else if (opts.rpcEngine == WritableRpcEngine.class) { - server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host) - .setPort(opts.getPort()).setNumHandlers(opts.serverThreads) - .setVerbose(false).build(); } else { throw new RuntimeException("Bad engine: " + opts.rpcEngine); } @@ -399,15 +390,6 @@ public String doEcho(String msg) throws Exception { return responseProto.getMessage(); } }; - } else if (opts.rpcEngine == WritableRpcEngine.class) { - final TestProtocol proxy = RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, addr, conf); - return new RpcServiceWrapper() { - @Override - public String doEcho(String msg) throws Exception { - return proxy.echo(msg); - } - }; } else { throw new RuntimeException("unsupported engine: " + opts.rpcEngine); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index 8b419e36d4..10e23baefe 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -17,252 +17,28 @@ */ package org.apache.hadoop.ipc; -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; -import org.apache.hadoop.net.NetUtils; -import org.junit.Before; import org.junit.After; +import org.junit.Before; import org.junit.Test; -import com.google.protobuf.BlockingService; public class TestMultipleProtocolServer extends TestRpcBase { - private static InetSocketAddress addr; + private static RPC.Server server; - private static Configuration conf = new Configuration(); - - - @ProtocolInfo(protocolName="Foo") - interface Foo0 extends VersionedProtocol { - public static final long versionID = 0L; - String ping() throws IOException; - - } - - @ProtocolInfo(protocolName="Foo") - interface Foo1 extends VersionedProtocol { - public static final long versionID = 1L; - String ping() throws IOException; - String ping2() throws IOException; - } - - @ProtocolInfo(protocolName="Foo") - interface FooUnimplemented extends VersionedProtocol { - public static final long versionID = 2L; - String ping() throws IOException; - } - - interface Mixin extends VersionedProtocol{ - public static final long versionID = 0L; - void hello() throws IOException; - } - - interface Bar extends Mixin { - public static final long versionID = 0L; - int echo(int i) throws IOException; - } - - class Foo0Impl implements Foo0 { - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return Foo0.versionID; - } - - @SuppressWarnings("unchecked") - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)getClass(). - getGenericInterfaces()[0]; - } catch (Exception e) { - throw new IOException(e); - } - return ProtocolSignature.getProtocolSignature(clientMethodsHash, - getProtocolVersion(protocol, clientVersion), inter); - } - - @Override - public String ping() { - return "Foo0"; - } - - } - - class Foo1Impl implements Foo1 { - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return Foo1.versionID; - } - - @SuppressWarnings("unchecked") - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)getClass(). - getGenericInterfaces()[0]; - } catch (Exception e) { - throw new IOException(e); - } - return ProtocolSignature.getProtocolSignature(clientMethodsHash, - getProtocolVersion(protocol, clientVersion), inter); - } - - @Override - public String ping() { - return "Foo1"; - } - - @Override - public String ping2() { - return "Foo1"; - - } - - } - - - class BarImpl implements Bar { - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return Bar.versionID; - } - - @SuppressWarnings("unchecked") - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)getClass(). - getGenericInterfaces()[0]; - } catch (Exception e) { - throw new IOException(e); - } - return ProtocolSignature.getProtocolSignature(clientMethodsHash, - getProtocolVersion(protocol, clientVersion), inter); - } - - @Override - public int echo(int i) { - return i; - } - - @Override - public void hello() { - - - } - } @Before public void setUp() throws Exception { - // create a server with two handlers - server = new RPC.Builder(conf).setProtocol(Foo0.class) - .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); - - - // Add Protobuf server - // Create server side implementation - PBServerImpl pbServerImpl = new PBServerImpl(); - BlockingService service = TestProtobufRpcProto - .newReflectiveBlockingService(pbServerImpl); - server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, - service); - server.start(); - addr = NetUtils.getConnectAddress(server); + super.setupConf(); + + server = setupTestServer(conf, 2); } - + @After public void tearDown() throws Exception { server.stop(); } - @Test - public void test1() throws IOException { - ProtocolProxy proxy; - proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); - Foo0 foo0 = (Foo0)proxy.getProxy(); - Assert.assertEquals("Foo0", foo0.ping()); - - - proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); - - - Foo1 foo1 = (Foo1)proxy.getProxy(); - Assert.assertEquals("Foo1", foo1.ping()); - Assert.assertEquals("Foo1", foo1.ping()); - - - proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); - - - Bar bar = (Bar)proxy.getProxy(); - Assert.assertEquals(99, bar.echo(99)); - - // Now test Mixin class method - - Mixin mixin = bar; - mixin.hello(); - } - - - // Server does not implement the FooUnimplemented version of protocol Foo. - // See that calls to it fail. - @Test(expected=IOException.class) - public void testNonExistingProtocol() throws IOException { - ProtocolProxy proxy; - proxy = RPC.getProtocolProxy(FooUnimplemented.class, - FooUnimplemented.versionID, addr, conf); - - FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); - foo.ping(); - } - - /** - * getProtocolVersion of an unimplemented version should return highest version - * Similarly getProtocolSignature should work. - * @throws IOException - */ - @Test - public void testNonExistingProtocol2() throws IOException { - ProtocolProxy proxy; - proxy = RPC.getProtocolProxy(FooUnimplemented.class, - FooUnimplemented.versionID, addr, conf); - - FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); - Assert.assertEquals(Foo1.versionID, - foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), - FooUnimplemented.versionID)); - foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), - FooUnimplemented.versionID, 0); - } - - @Test(expected=IOException.class) - public void testIncorrectServerCreation() throws IOException { - new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false) - .build(); - } - // Now test a PB service - a server hosts both PB and Writable Rpcs. @Test public void testPBService() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java index 969f728f77..6d83d7d368 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java @@ -25,19 +25,6 @@ public class TestRPCCallBenchmark { - @Test(timeout=20000) - public void testBenchmarkWithWritable() throws Exception { - int rc = ToolRunner.run(new RPCCallBenchmark(), - new String[] { - "--clientThreads", "30", - "--serverThreads", "30", - "--time", "5", - "--serverReaderThreads", "4", - "--messageSize", "1024", - "--engine", "writable"}); - assertEquals(0, rc); - } - @Test(timeout=20000) public void testBenchmarkWithProto() throws Exception { int rc = ToolRunner.run(new RPCCallBenchmark(), diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index 2ac2be990d..a06d9fdc01 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -18,27 +18,19 @@ package org.apache.hadoop.ipc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import org.junit.Assert; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; -import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; -import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; -import org.apache.hadoop.net.NetUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; /** Unit test for supporting method-name based compatible RPCs. */ public class TestRPCCompatibility { @@ -49,7 +41,7 @@ public class TestRPCCompatibility { public static final Log LOG = LogFactory.getLog(TestRPCCompatibility.class); - + private static Configuration conf = new Configuration(); public interface TestProtocol0 extends VersionedProtocol { @@ -120,6 +112,21 @@ public long getProtocolVersion(String protocol, @Before public void setUp() { ProtocolSignature.resetCache(); + + RPC.setProtocolEngine(conf, + TestProtocol0.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol1.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol2.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol3.class, ProtobufRpcEngine.class); + + RPC.setProtocolEngine(conf, + TestProtocol4.class, ProtobufRpcEngine.class); } @After @@ -133,117 +140,7 @@ public void tearDown() { server = null; } } - - @Test // old client vs new server - public void testVersion0ClientVersion1Server() throws Exception { - // create a server with two handlers - TestImpl1 impl = new TestImpl1(); - server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - addr = NetUtils.getConnectAddress(server); - proxy = RPC.getProtocolProxy( - TestProtocol0.class, TestProtocol0.versionID, addr, conf); - - TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy(); - proxy0.ping(); - } - - @Test // old client vs new server - public void testVersion1ClientVersion0Server() throws Exception { - // create a server with two handlers - server = new RPC.Builder(conf).setProtocol(TestProtocol0.class) - .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.start(); - addr = NetUtils.getConnectAddress(server); - - proxy = RPC.getProtocolProxy( - TestProtocol1.class, TestProtocol1.versionID, addr, conf); - - TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy(); - proxy1.ping(); - try { - proxy1.echo("hello"); - fail("Echo should fail"); - } catch(IOException e) { - } - } - - private class Version2Client { - - private TestProtocol2 proxy2; - private ProtocolProxy serverInfo; - - private Version2Client() throws IOException { - serverInfo = RPC.getProtocolProxy( - TestProtocol2.class, TestProtocol2.versionID, addr, conf); - proxy2 = serverInfo.getProxy(); - } - - public int echo(int value) throws IOException, NumberFormatException { - if (serverInfo.isMethodSupported("echo", int.class)) { -System.out.println("echo int is supported"); - return -value; // use version 3 echo long - } else { // server is version 2 -System.out.println("echo int is NOT supported"); - return Integer.parseInt(proxy2.echo(String.valueOf(value))); - } - } - - public String echo(String value) throws IOException { - return proxy2.echo(value); - } - - public void ping() throws IOException { - proxy2.ping(); - } - } - - @Test // Compatible new client & old server - public void testVersion2ClientVersion1Server() throws Exception { - // create a server with two handlers - TestImpl1 impl = new TestImpl1(); - server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - addr = NetUtils.getConnectAddress(server); - - - Version2Client client = new Version2Client(); - client.ping(); - assertEquals("hello", client.echo("hello")); - - // echo(int) is not supported by server, so returning 3 - // This verifies that echo(int) and echo(String)'s hash codes are different - assertEquals(3, client.echo(3)); - } - - @Test // equal version client and server - public void testVersion2ClientVersion2Server() throws Exception { - // create a server with two handlers - TestImpl2 impl = new TestImpl2(); - server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - addr = NetUtils.getConnectAddress(server); - - Version2Client client = new Version2Client(); - - client.ping(); - assertEquals("hello", client.echo("hello")); - - // now that echo(int) is supported by the server, echo(int) should return -3 - assertEquals(-3, client.echo(3)); - } - public interface TestProtocol3 { int echo(String value); int echo(int value); @@ -297,97 +194,4 @@ public interface TestProtocol4 extends TestProtocol2 { @Override int echo(int value) throws IOException; } - - @Test - public void testVersionMismatch() throws IOException { - server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) - .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.start(); - addr = NetUtils.getConnectAddress(server); - - TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class, - TestProtocol4.versionID, addr, conf); - try { - proxy.echo(21); - fail("The call must throw VersionMismatch exception"); - } catch (RemoteException ex) { - Assert.assertEquals(RPC.VersionMismatch.class.getName(), - ex.getClassName()); - Assert.assertTrue(ex.getErrorCode().equals( - RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH)); - } catch (IOException ex) { - fail("Expected version mismatch but got " + ex); - } - } - - @Test - public void testIsMethodSupported() throws IOException { - server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) - .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - server.start(); - addr = NetUtils.getConnectAddress(server); - - TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, - TestProtocol2.versionID, addr, conf); - boolean supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE, - RPC.getProtocolVersion(TestProtocol2.class), "echo"); - Assert.assertTrue(supported); - supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, - RPC.getProtocolVersion(TestProtocol2.class), "echo"); - Assert.assertFalse(supported); - } - - /** - * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up - * the server registry to extract protocol signatures and versions. - */ - @Test - public void testProtocolMetaInfoSSTranslatorPB() throws Exception { - TestImpl1 impl = new TestImpl1(); - server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) - .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) - .setVerbose(false).build(); - server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); - server.start(); - - ProtocolMetaInfoServerSideTranslatorPB xlator = - new ProtocolMetaInfoServerSideTranslatorPB(server); - - GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( - null, - createGetProtocolSigRequestProto(TestProtocol1.class, - RPC.RpcKind.RPC_PROTOCOL_BUFFER)); - //No signatures should be found - Assert.assertEquals(0, resp.getProtocolSignatureCount()); - resp = xlator.getProtocolSignature( - null, - createGetProtocolSigRequestProto(TestProtocol1.class, - RPC.RpcKind.RPC_WRITABLE)); - Assert.assertEquals(1, resp.getProtocolSignatureCount()); - ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); - Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); - boolean found = false; - int expected = ProtocolSignature.getFingerprint(TestProtocol1.class - .getMethod("echo", String.class)); - for (int m : sig.getMethodsList()) { - if (expected == m) { - found = true; - break; - } - } - Assert.assertTrue(found); - } - - private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( - Class protocol, RPC.RpcKind rpcKind) { - GetProtocolSignatureRequestProto.Builder builder = - GetProtocolSignatureRequestProto.newBuilder(); - builder.setProtocol(protocol.getName()); - builder.setRpcKind(rpcKind.toString()); - return builder.build(); - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java index 5807998a15..b22f91b8e8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; -import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -30,11 +28,13 @@ import java.net.InetSocketAddress; import java.nio.channels.ClosedByInterruptException; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; + /** * tests that the proxy can be interrupted */ -public class TestRPCWaitForProxy extends Assert { - private static final String ADDRESS = "0.0.0.0"; +public class TestRPCWaitForProxy extends TestRpcBase { private static final Logger LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class); @@ -46,14 +46,15 @@ public class TestRPCWaitForProxy extends Assert { * * @throws Throwable any exception other than that which was expected */ - @Test(timeout = 10000) + @Test(timeout = 50000) public void testWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(0); worker.start(); worker.join(); Throwable caught = worker.getCaught(); - assertNotNull("No exception was raised", caught); - if (!(caught instanceof ConnectException)) { + Throwable cause = caught.getCause(); + Assert.assertNotNull("No exception was raised", cause); + if (!(cause instanceof ConnectException)) { throw caught; } } @@ -69,11 +70,11 @@ public void testInterruptedWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(100); worker.start(); Thread.sleep(1000); - assertTrue("worker hasn't started", worker.waitStarted); + Assert.assertTrue("worker hasn't started", worker.waitStarted); worker.interrupt(); worker.join(); Throwable caught = worker.getCaught(); - assertNotNull("No exception was raised", caught); + Assert.assertNotNull("No exception was raised", caught); // looking for the root cause here, which can be wrapped // as part of the NetUtils work. Having this test look // a the type of exception there would be brittle to improvements @@ -82,6 +83,8 @@ public void testInterruptedWaitForProxy() throws Throwable { if (cause == null) { // no inner cause, use outer exception as root cause. cause = caught; + } else if (cause.getCause() != null) { + cause = cause.getCause(); } if (!(cause instanceof InterruptedIOException) && !(cause instanceof ClosedByInterruptException)) { @@ -112,12 +115,16 @@ public void run() { IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, connectRetries); waitStarted = true; - TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, - TestProtocol.versionID, - new InetSocketAddress(ADDRESS, 20), - config, - 15000L); - proxy.echo(""); + + short invalidPort = 20; + InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, + invalidPort); + TestRpcBase.TestRpcService proxy = RPC.getProxy( + TestRpcBase.TestRpcService.class, + 1L, invalidAddress, conf); + // Test echo method + proxy.echo(null, newEchoRequest("hello")); + } catch (Throwable throwable) { caught = throwable; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index bc604a47ef..5a8f8d0124 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -112,7 +112,8 @@ protected static RPC.Server setupTestServer(Configuration serverConf, return setupTestServer(builder); } - protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException { + protected static RPC.Server setupTestServer( + RPC.Builder builder) throws IOException { RPC.Server server = builder.build(); server.start(); @@ -175,17 +176,21 @@ public static class TestTokenIdentifier extends TokenIdentifier { public TestTokenIdentifier() { this(new Text(), new Text()); } + public TestTokenIdentifier(Text tokenid) { this(tokenid, new Text()); } + public TestTokenIdentifier(Text tokenid, Text realUser) { this.tokenid = tokenid == null ? new Text() : tokenid; this.realUser = realUser == null ? new Text() : realUser; } + @Override public Text getKind() { return KIND_NAME; } + @Override public UserGroupInformation getUser() { if (realUser.toString().isEmpty()) { @@ -203,6 +208,7 @@ public void readFields(DataInput in) throws IOException { tokenid.readFields(in); realUser.readFields(in); } + @Override public void write(DataOutput out) throws IOException { tokenid.write(out); @@ -234,7 +240,7 @@ public static class TestTokenSelector implements @SuppressWarnings("unchecked") @Override public Token selectToken(Text service, - Collection> tokens) { + Collection> tokens) { if (service == null) { return null; } @@ -388,19 +394,17 @@ public TestProtos.AuthMethodResponseProto getAuthMethod( } @Override - public TestProtos.AuthUserResponseProto getAuthUser( + public TestProtos.UserResponseProto getAuthUser( RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException { - UserGroupInformation authUser = null; + UserGroupInformation authUser; try { authUser = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new ServiceException(e); } - return TestProtos.AuthUserResponseProto.newBuilder() - .setAuthUser(authUser.getUserName()) - .build(); + return newUserResponse(authUser.getUserName()); } @Override @@ -432,6 +436,34 @@ public TestProtos.EmptyResponseProto sendPostponed( return TestProtos.EmptyResponseProto.newBuilder().build(); } + + @Override + public TestProtos.UserResponseProto getCurrentUser( + RpcController controller, + TestProtos.EmptyRequestProto request) throws ServiceException { + String user; + try { + user = UserGroupInformation.getCurrentUser().toString(); + } catch (IOException e) { + throw new ServiceException("Failed to get current user", e); + } + + return newUserResponse(user); + } + + @Override + public TestProtos.UserResponseProto getServerRemoteUser( + RpcController controller, + TestProtos.EmptyRequestProto request) throws ServiceException { + String serverRemoteUser = Server.getRemoteUser().toString(); + return newUserResponse(serverRemoteUser); + } + + private TestProtos.UserResponseProto newUserResponse(String user) { + return TestProtos.UserResponseProto.newBuilder() + .setUser(user) + .build(); + } } protected static TestProtos.EmptyRequestProto newEmptyRequest() { @@ -478,8 +510,4 @@ protected static AuthMethod convert( } return null; } - - protected static String convert(TestProtos.AuthUserResponseProto response) { - return response.getAuthUser(); - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index ec53e8c976..3809448ad4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -29,12 +29,25 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.*; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslPlainServer; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcClient; +import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; +import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.TestUserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.token.*; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.security.token.TokenSelector; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Before; @@ -44,30 +57,55 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import javax.security.auth.callback.*; -import javax.security.sasl.*; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; import java.io.IOException; import java.lang.annotation.Annotation; import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.security.Security; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*; -import static org.junit.Assert.*; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** Unit tests for using Sasl over RPC. */ @RunWith(Parameterized.class) public class TestSaslRPC extends TestRpcBase { @Parameters public static Collection data() { - Collection params = new ArrayList(); + Collection params = new ArrayList<>(); for (QualityOfProtection qop : QualityOfProtection.values()) { params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null }); } @@ -113,7 +151,7 @@ enum UseToken { NONE(), VALID(), INVALID(), - OTHER(); + OTHER() } @BeforeClass @@ -229,7 +267,7 @@ public void testDigestRpcWithoutAnnotation() throws Exception { final Server server = setupTestServer(conf, 5, sm); doDigestRpc(server, sm); } finally { - SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); + SecurityUtil.setSecurityInfoProviders(); } } @@ -258,7 +296,7 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm) addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName())); - Token token = new Token(tokenId, sm); + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); @@ -286,8 +324,8 @@ public void testPingInterval() throws Exception { // set doPing to true newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); - ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0), - TestRpcService.class, null, 0, null, newConf); + ConnectionId remoteId = ConnectionId.getConnectionId( + new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf); assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, remoteId.getPingInterval()); // set doPing to false @@ -796,13 +834,13 @@ private String internalGetAuthMethod( final TestTokenSecretManager sm = new TestTokenSecretManager(); boolean useSecretManager = (serverAuth != SIMPLE); if (enableSecretManager != null) { - useSecretManager &= enableSecretManager.booleanValue(); + useSecretManager &= enableSecretManager; } if (forceSecretManager != null) { - useSecretManager |= forceSecretManager.booleanValue(); + useSecretManager |= forceSecretManager; } final SecretManager serverSm = useSecretManager ? sm : null; - + Server server = serverUgi.doAs(new PrivilegedExceptionAction() { @Override public Server run() throws IOException { @@ -857,13 +895,13 @@ public String run() throws IOException { proxy.ping(null, newEmptyRequest()); // make sure the other side thinks we are who we said we are!!! assertEquals(clientUgi.getUserName(), - convert(proxy.getAuthUser(null, newEmptyRequest()))); + proxy.getAuthUser(null, newEmptyRequest()).getUser()); AuthMethod authMethod = convert(proxy.getAuthMethod(null, newEmptyRequest())); // verify sasl completed with correct QOP assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null, - RPC.getConnectionIdForProxy(proxy).getSaslQop()); - return authMethod.toString(); + RPC.getConnectionIdForProxy(proxy).getSaslQop()); + return authMethod != null ? authMethod.toString() : null; } catch (ServiceException se) { if (se.getCause() instanceof RemoteException) { throw (RemoteException) se.getCause(); @@ -888,21 +926,18 @@ private static void assertAuthEquals(AuthMethod expect, String actual) { assertEquals(expect.toString(), actual); } - - private static void assertAuthEquals(Pattern expect, - String actual) { + + private static void assertAuthEquals(Pattern expect, String actual) { // this allows us to see the regexp and the value it didn't match if (!expect.matcher(actual).matches()) { - assertEquals(expect, actual); // it failed - } else { - assertTrue(true); // it matched + fail(); // it failed } } /* * Class used to test overriding QOP values using SaslPropertiesResolver */ - static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{ + static class AuthSaslPropertiesResolver extends SaslPropertiesResolver { @Override public Map getServerProperties(InetAddress address) { @@ -911,7 +946,7 @@ public Map getServerProperties(InetAddress address) { return newPropertes; } } - + public static void main(String[] args) throws Exception { System.out.println("Testing Kerberos authentication over RPC"); if (args.length != 2) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index 50d389c646..c4dbcac4c2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -17,40 +17,35 @@ */ package org.apache.hadoop.security; +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Enumeration; -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenInfo; -import org.junit.Before; -import org.junit.Test; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector; -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; - /** - * + * Test do as effective user. */ -public class TestDoAsEffectiveUser { +public class TestDoAsEffectiveUser extends TestRpcBase { final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG"; final private static String REAL_USER_SHORT_NAME = "realUser1"; final private static String PROXY_USER_NAME = "proxyUser"; @@ -58,8 +53,8 @@ public class TestDoAsEffectiveUser { final private static String GROUP2_NAME = "group2"; final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME, GROUP2_NAME }; - private static final String ADDRESS = "0.0.0.0"; - private TestProtocol proxy; + + private TestRpcService client; private static final Configuration masterConf = new Configuration(); @@ -82,7 +77,7 @@ public void setMasterConf() throws IOException { private void configureSuperUserIPAddresses(Configuration conf, String superUserShortName) throws IOException { - ArrayList ipList = new ArrayList(); + ArrayList ipList = new ArrayList<>(); Enumeration netInterfaceList = NetworkInterface .getNetworkInterfaces(); while (netInterfaceList.hasMoreElements()) { @@ -130,50 +125,19 @@ public UserGroupInformation run() throws IOException { curUGI.toString()); } - @TokenInfo(TestTokenSelector.class) - public interface TestProtocol extends VersionedProtocol { - public static final long versionID = 1L; - - String aMethod() throws IOException; - String getServerRemoteUser() throws IOException; - } - - public class TestImpl implements TestProtocol { - - @Override - public String aMethod() throws IOException { - return UserGroupInformation.getCurrentUser().toString(); - } - - @Override - public String getServerRemoteUser() throws IOException { - return Server.getRemoteUser().toString(); - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return TestProtocol.versionID; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return new ProtocolSignature(TestProtocol.versionID, null); - } - } - - private void checkRemoteUgi(final Server server, - final UserGroupInformation ugi, final Configuration conf) - throws Exception { + private void checkRemoteUgi(final UserGroupInformation ugi, + final Configuration conf) throws Exception { ugi.doAs(new PrivilegedExceptionAction() { @Override - public Void run() throws IOException { - proxy = RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, - NetUtils.getConnectAddress(server), conf); - Assert.assertEquals(ugi.toString(), proxy.aMethod()); - Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser()); + public Void run() throws ServiceException { + client = getClient(addr, conf); + String currentUser = client.getCurrentUser(null, + newEmptyRequest()).getUser(); + String serverRemoteUser = client.getServerRemoteUser(null, + newEmptyRequest()).getUser(); + + Assert.assertEquals(ugi.toString(), currentUser); + Assert.assertEquals(ugi.toString(), serverRemoteUser); return null; } }); @@ -185,29 +149,27 @@ public void testRealUserSetup() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).build(); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 5); refreshConf(conf); try { - server.start(); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(server, realUserUgi, conf); + checkRemoteUgi(realUserUgi, conf); - UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting( + UserGroupInformation proxyUserUgi = + UserGroupInformation.createProxyUserForTesting( PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(server, proxyUserUgi, conf); + checkRemoteUgi(proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -218,29 +180,25 @@ public void testRealUserAuthorizationSuccess() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 5); refreshConf(conf); try { - server.start(); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(server, realUserUgi, conf); + checkRemoteUgi(realUserUgi, conf); UserGroupInformation proxyUserUgi = UserGroupInformation .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(server, proxyUserUgi, conf); + checkRemoteUgi(proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -256,17 +214,14 @@ public void testRealUserIPAuthorizationFailure() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 5); refreshConf(conf); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -275,11 +230,10 @@ public void testRealUserIPAuthorizationFailure() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -287,10 +241,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -299,17 +250,14 @@ public void testRealUserIPNotSpecified() throws IOException { final Configuration conf = new Configuration(); conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 2); refreshConf(conf); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -318,11 +266,10 @@ public void testRealUserIPNotSpecified() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -330,10 +277,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -341,15 +285,12 @@ public String run() throws IOException { public void testRealUserGroupNotSpecified() throws IOException { final Configuration conf = new Configuration(); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 2); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -358,11 +299,10 @@ public void testRealUserGroupNotSpecified() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -370,10 +310,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -384,17 +321,14 @@ public void testRealUserGroupAuthorizationFailure() throws IOException { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group3"); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + final Server server = setupTestServer(conf, 2); refreshConf(conf); try { - server.start(); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -403,11 +337,10 @@ public void testRealUserGroupAuthorizationFailure() throws IOException { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws IOException { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + public String run() throws ServiceException { + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } }); @@ -415,10 +348,7 @@ public String run() throws IOException { } catch (Exception e) { e.printStackTrace(); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } @@ -432,20 +362,17 @@ public void testProxyWithToken() throws Exception { final Configuration conf = new Configuration(masterConf); TestTokenSecretManager sm = new TestTokenSecretManager(); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(conf); - final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build(); - - server.start(); + final Server server = setupTestServer(conf, 5, sm); final UserGroupInformation current = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); + TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token(tokenId, + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); UserGroupInformation proxyUserUgi = UserGroupInformation @@ -453,23 +380,19 @@ public void testProxyWithToken() throws Exception { proxyUserUgi.addToken(token); refreshConf(conf); - + String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); - String ret = proxy.aMethod(); - return ret; + client = getClient(addr, conf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } catch (Exception e) { e.printStackTrace(); throw e; } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } }); @@ -486,42 +409,34 @@ public void testTokenBySuperUser() throws Exception { TestTokenSecretManager sm = new TestTokenSecretManager(); final Configuration newConf = new Configuration(masterConf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(newConf, TestRpcService.class, + ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(newConf); - final Server server = new RPC.Builder(newConf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); - - server.start(); + final Server server = setupTestServer(newConf, 5, sm); final UserGroupInformation current = UserGroupInformation .createUserForTesting(REAL_USER_NAME, GROUP_NAMES); refreshConf(newConf); - - final InetSocketAddress addr = NetUtils.getConnectAddress(server); + TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token(tokenId, - sm); + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); String retVal = current.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, newConf); - String ret = proxy.aMethod(); - return ret; + client = getClient(addr, newConf); + return client.getCurrentUser(null, + newEmptyRequest()).getUser(); } catch (Exception e) { e.printStackTrace(); throw e; } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, client); } } }); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java index 91f36e599c..462f0a4822 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -28,7 +29,11 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; @@ -48,9 +53,22 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL; -import static org.apache.hadoop.ipc.TestSaslRPC.*; -import static org.apache.hadoop.test.MetricsAsserts.*; -import static org.junit.Assert.*; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; +import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; +import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -107,7 +125,7 @@ public void resetUgi() { UserGroupInformation.setLoginUser(null); } - @Test (timeout = 30000) + @Test(timeout = 30000) public void testSimpleLogin() throws IOException { tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index 99cd93d711..6411f97ab6 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -88,6 +88,6 @@ message AuthMethodResponseProto { required string mechanismName = 2; } -message AuthUserResponseProto { - required string authUser = 1; +message UserResponseProto { + required string user = 1; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 3292115885..06f6c4fc1d 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -40,9 +40,11 @@ service TestProtobufRpcProto { rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); rpc sleep(SleepRequestProto) returns (EmptyResponseProto); rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto); - rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto); + rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto); rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto); rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto); + rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto); + rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto); } service TestProtobufRpc2Proto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 6b52949868..57f7cb197b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -168,7 +168,6 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; -import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; @@ -317,8 +316,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); - - WritableRpcEngine.ensureInitialized(); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java deleted file mode 100644 index 0b7ee337d8..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.security; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.mockito.Mockito.mock; - -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslRpcClient; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.Test; - -/** Unit tests for using Delegation Token over RPC. */ -public class TestClientProtocolWithDelegationToken { - private static final String ADDRESS = "0.0.0.0"; - - public static final Log LOG = LogFactory - .getLog(TestClientProtocolWithDelegationToken.class); - - private static final Configuration conf; - static { - conf = new Configuration(); - conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(conf); - } - - static { - GenericTestUtils.setLogLevel(Client.LOG, Level.ALL); - GenericTestUtils.setLogLevel(Server.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL); - } - - @Test - public void testDelegationTokenRpc() throws Exception { - ClientProtocol mockNN = mock(ClientProtocol.class); - FSNamesystem mockNameSys = mock(FSNamesystem.class); - - DelegationTokenSecretManager sm = new DelegationTokenSecretManager( - DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, - DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, - DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, - 3600000, mockNameSys); - sm.startThreads(); - final Server server = new RPC.Builder(conf) - .setProtocol(ClientProtocol.class).setInstance(mockNN) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); - - server.start(); - - final UserGroupInformation current = UserGroupInformation.getCurrentUser(); - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - String user = current.getUserName(); - Text owner = new Text(user); - DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null); - Token token = new Token( - dtId, sm); - SecurityUtil.setTokenService(token, addr); - LOG.info("Service for token is " + token.getService()); - current.addToken(token); - current.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - ClientProtocol proxy = null; - try { - proxy = RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, addr, conf); - proxy.getServerDefaults(); - } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } - } - return null; - } - }); - } - -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index 3fef5e278b..729af0a951 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -98,8 +97,6 @@ public void serviceInit(Configuration conf) throws Exception { BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService .newReflectiveBlockingService(refreshHSAdminProtocolXlator); - WritableRpcEngine.ensureInitialized(); - clientRpcAddress = conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.JHS_ADMIN_ADDRESS,