diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ce87883546..4764b9db94 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -79,11 +79,19 @@ Trunk (unreleased changes) HADOOP-7899. Generate proto java files as part of the build. (tucu) - HADOOP-7574. Improve FSShell -stat, add user/group elements (XieXianshan via harsh) + HADOOP-7574. Improve FSShell -stat, add user/group elements. + (XieXianshan via harsh) - HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead (XieXianshan via harsh) + HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead. + (XieXianshan via harsh) - HADOOP-7919. Remove the unused hadoop.logfile.* properties from the core-default.xml file. (harsh) + HADOOP-7919. Remove the unused hadoop.logfile.* properties from the + core-default.xml file. (harsh) + + HADOOP-7808. Port HADOOP-7510 - Add configurable option to use original + hostname in token instead of IP to allow server IP change. + (Daryn Sharp via suresh) + BUGS @@ -241,6 +249,9 @@ Release 0.23.1 - Unreleased HADOOP-7948. Shell scripts created by hadoop-dist/pom.xml to build tar do not properly propagate failure. (cim_michajlomatijkiw via tucu) + HADOOP-7949. Updated maxIdleTime default in the code to match + core-default.xml (eli) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 7c9b25c957..f0ca72b00e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -51,7 +51,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** How often does RPC client send pings to RPC server */ public static final String IPC_PING_INTERVAL_KEY = "ipc.ping.interval"; /** Default value for IPC_PING_INTERVAL_KEY */ - public static final int IPC_PING_INTERVAL_DEFAULT = 60000; + public static final int IPC_PING_INTERVAL_DEFAULT = 60000; // 1 min /** Enables pings from RPC client to the server */ public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping"; /** Default value of IPC_CLIENT_PING_KEY */ @@ -114,5 +114,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS = "security.refresh.user.mappings.protocol.acl"; + + public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP = + "hadoop.security.token.service.use_ip"; + public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT = + true; + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 534046a9ab..7953411b57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -165,7 +165,7 @@ public class CommonConfigurationKeysPublic { public static final String IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY = "ipc.client.connection.maxidletime"; /** Default value for IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY */ - public static final int IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT = 10000; + public static final int IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT = 10000; // 10s /** See core-default.xml */ public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY = "ipc.client.connect.max.retries"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 4fe9d77573..64f7c68a6d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -186,6 +187,15 @@ public void initialize(URI name, Configuration conf) throws IOException { /** Returns a URI whose scheme and authority identify this FileSystem.*/ public abstract URI getUri(); + /** + * Resolve the uri's hostname and add the default port if not in the uri + * @return URI + * @see NetUtils#getCanonicalUri(URI, int) + */ + protected URI getCanonicalUri() { + return NetUtils.getCanonicalUri(getUri(), getDefaultPort()); + } + /** * Get the default port for this file system. * @return the default port or 0 if there isn't one @@ -195,8 +205,13 @@ protected int getDefaultPort() { } /** - * Get a canonical name for this file system. - * @return a URI string that uniquely identifies this file system + * Get a canonical service name for this file system. The token cache is + * the only user of this value, and uses it to lookup this filesystem's + * service tokens. The token cache will not attempt to acquire tokens if the + * service is null. + * @return a service string that uniquely identifies this file system, null + * if the filesystem does not implement tokens + * @see SecurityUtil#buildDTServiceName(URI, int) */ public String getCanonicalServiceName() { return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort()); @@ -487,32 +502,31 @@ protected FileSystem() { */ protected void checkPath(Path path) { URI uri = path.toUri(); - if (uri.getScheme() == null) // fs is relative - return; - String thisScheme = this.getUri().getScheme(); String thatScheme = uri.getScheme(); - String thisAuthority = this.getUri().getAuthority(); - String thatAuthority = uri.getAuthority(); + if (thatScheme == null) // fs is relative + return; + URI thisUri = getCanonicalUri(); + String thisScheme = thisUri.getScheme(); //authority and scheme are not case sensitive if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match - if (thisAuthority == thatAuthority || // & authorities match - (thisAuthority != null && - thisAuthority.equalsIgnoreCase(thatAuthority))) - return; - + String thisAuthority = thisUri.getAuthority(); + String thatAuthority = uri.getAuthority(); if (thatAuthority == null && // path's authority is null thisAuthority != null) { // fs has an authority - URI defaultUri = getDefaultUri(getConf()); // & is the conf default - if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) && - thisAuthority.equalsIgnoreCase(defaultUri.getAuthority())) - return; - try { // or the default fs's uri - defaultUri = get(getConf()).getUri(); - } catch (IOException e) { - throw new RuntimeException(e); + URI defaultUri = getDefaultUri(getConf()); + if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) { + uri = defaultUri; // schemes match, so use this uri instead + } else { + uri = null; // can't determine auth of the path } - if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) && - thisAuthority.equalsIgnoreCase(defaultUri.getAuthority())) + } + if (uri != null) { + // canonicalize uri before comparing with this fs + uri = NetUtils.getCanonicalUri(uri, getDefaultPort()); + thatAuthority = uri.getAuthority(); + if (thisAuthority == thatAuthority || // authorities match + (thisAuthority != null && + thisAuthority.equalsIgnoreCase(thatAuthority))) return; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index f59085c87a..cedf802228 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -77,6 +77,15 @@ public URI getUri() { return fs.getUri(); } + /** + * Returns a qualified URI whose scheme and authority identify this + * FileSystem. + */ + @Override + protected URI getCanonicalUri() { + return fs.getCanonicalUri(); + } + /** Make sure that a path specifies a FileSystem. */ public Path makeQualified(Path path) { return fs.makeQualified(path); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index c636493911..5fe97eac1b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.io.IOUtils; @@ -88,8 +89,6 @@ public class Client { private SocketFactory socketFactory; // how to create sockets private int refCount = 1; - final static String PING_INTERVAL_NAME = "ipc.ping.interval"; - final static int DEFAULT_PING_INTERVAL = 60000; // 1 min final static int PING_CALL_ID = -1; /** @@ -99,7 +98,7 @@ public class Client { * @param pingInterval the ping interval */ final public static void setPingInterval(Configuration conf, int pingInterval) { - conf.setInt(PING_INTERVAL_NAME, pingInterval); + conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval); } /** @@ -110,7 +109,8 @@ final public static void setPingInterval(Configuration conf, int pingInterval) { * @return the ping interval */ final static int getPingInterval(Configuration conf) { - return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL); + return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, + CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT); } /** @@ -123,7 +123,7 @@ final static int getPingInterval(Configuration conf) { * @return the timeout period in milliseconds. -1 if no timeout value is set */ final public static int getTimeout(Configuration conf) { - if (!conf.getBoolean("ipc.client.ping", true)) { + if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true)) { return getPingInterval(conf); } return -1; @@ -425,7 +425,7 @@ private synchronized boolean setupSaslConnection(final InputStream in2, */ private synchronized boolean updateAddress() throws IOException { // Do a fresh lookup with the old host name. - InetSocketAddress currentAddr = new InetSocketAddress( + InetSocketAddress currentAddr = NetUtils.createSocketAddrForHost( server.getHostName(), server.getPort()); if (!server.equals(currentAddr)) { @@ -1347,15 +1347,19 @@ public static ConnectionId getConnectionId(InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf) throws IOException { String remotePrincipal = getRemotePrincipal(conf, addr, protocol); - boolean doPing = conf.getBoolean("ipc.client.ping", true); + boolean doPing = + conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); return new ConnectionId(addr, protocol, ticket, rpcTimeout, remotePrincipal, - conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s - conf.getInt("ipc.client.connect.max.retries", 10), conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT), - conf.getBoolean("ipc.client.tcpnodelay", false), + conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT), + conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT), + conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT), doPing, (doPing ? Client.getPingInterval(conf) : 0)); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index b9220a6df5..8046786685 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -62,6 +62,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -378,7 +379,9 @@ private class Listener extends Thread { //-tion (for idle connections) ran private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs - private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); + private int backlogLength = conf.getInt( + CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, + CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); @@ -1712,12 +1715,18 @@ protected Server(String bindAddress, int port, } else { this.readThreads = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); } this.callQueue = new LinkedBlockingQueue(maxQueueSize); - this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); - this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); - this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); + this.maxIdleTime = 2 * conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT); + this.maxConnectionsToNuke = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT); + this.thresholdIdleConnections = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT); this.secretManager = (SecretManager) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, @@ -1729,7 +1738,9 @@ protected Server(String bindAddress, int port, this.port = listener.getAddress().getPort(); this.rpcMetrics = RpcMetrics.create(this); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); - this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); + this.tcpNoDelay = conf.getBoolean( + CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, + CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT); // Create the responder here responder = new Responder(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/Util.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/Util.java index e68325b09d..166a846fdf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/Util.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/Util.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.net.NetUtils; /** * Static utility methods @@ -56,14 +57,7 @@ public static List parse(String specs, int defaultPort) { else { String[] specStrings = specs.split("[ ,]+"); for (String specString : specStrings) { - int colon = specString.indexOf(':'); - if (colon < 0 || colon == specString.length() - 1) { - result.add(new InetSocketAddress(specString, defaultPort)); - } else { - String hostname = specString.substring(0, colon); - int port = Integer.parseInt(specString.substring(colon+1)); - result.add(new InetSocketAddress(hostname, port)); - } + result.add(NetUtils.createSocketAddr(specString, defaultPort)); } } return result; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Servers.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Servers.java index 2ddf3c5fb5..aa5d715078 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Servers.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Servers.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.net.NetUtils; /** * Helpers to handle server addresses @@ -57,14 +58,7 @@ public static List parse(String specs, int defaultPort) { else { String[] specStrings = specs.split("[ ,]+"); for (String specString : specStrings) { - int colon = specString.indexOf(':'); - if (colon < 0 || colon == specString.length() - 1) { - result.add(new InetSocketAddress(specString, defaultPort)); - } else { - String hostname = specString.substring(0, colon); - int port = Integer.parseInt(specString.substring(colon+1)); - result.add(new InetSocketAddress(hostname, port)); - } + result.add(NetUtils.createSocketAddr(specString, defaultPort)); } } return result; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index ceaccb285b..752c0be8bc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -37,6 +37,7 @@ import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import javax.net.SocketFactory; @@ -45,11 +46,17 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.annotations.VisibleForTesting; + +//this will need to be replaced someday when there is a suitable replacement +import sun.net.dns.ResolverConfiguration; +import sun.net.util.IPAddressUtil; + @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Unstable public class NetUtils { @@ -65,6 +72,26 @@ public class NetUtils { /** Base URL of the Hadoop Wiki: {@value} */ public static final String HADOOP_WIKI = "http://wiki.apache.org/hadoop/"; + private static HostResolver hostResolver; + + static { + // SecurityUtils requires a more secure host resolver if tokens are + // using hostnames + setUseQualifiedHostResolver(!SecurityUtil.getTokenServiceUseIp()); + } + + /** + * This method is intended for use only by SecurityUtils! + * @param flag where the qualified or standard host resolver is used + * to create socket addresses + */ + @InterfaceAudience.Private + public static void setUseQualifiedHostResolver(boolean flag) { + hostResolver = flag + ? new QualifiedHostResolver() + : new StandardHostResolver(); + } + /** * Get the socket factory for the given class according to its * configuration parameter @@ -178,43 +205,256 @@ public static InetSocketAddress createSocketAddr(String target, throw new IllegalArgumentException("Target address cannot be null." + helpText); } - int colonIndex = target.indexOf(':'); - if (colonIndex < 0 && defaultPort == -1) { - throw new RuntimeException("Not a host:port pair: " + target + - helpText); - } - String hostname; - int port = -1; - if (!target.contains("/")) { - if (colonIndex == -1) { - hostname = target; - } else { - // must be the old style : - hostname = target.substring(0, colonIndex); - String portStr = target.substring(colonIndex + 1); - try { - port = Integer.parseInt(portStr); - } catch (NumberFormatException nfe) { - throw new IllegalArgumentException( - "Can't parse port '" + portStr + "'" - + helpText); - } - } - } else { - // a new uri - URI addr = new Path(target).toUri(); - hostname = addr.getHost(); - port = addr.getPort(); + boolean hasScheme = target.contains("://"); + URI uri = null; + try { + uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Does not contain a valid host:port authority: " + target + helpText + ); } + String host = uri.getHost(); + int port = uri.getPort(); if (port == -1) { port = defaultPort; } - - if (getStaticResolution(hostname) != null) { - hostname = getStaticResolution(hostname); + String path = uri.getPath(); + + if ((host == null) || (port < 0) || + (!hasScheme && path != null && !path.isEmpty())) + { + throw new IllegalArgumentException( + "Does not contain a valid host:port authority: " + target + helpText + ); } - return new InetSocketAddress(hostname, port); + return createSocketAddrForHost(host, port); + } + + /** + * Create a socket address with the given host and port. The hostname + * might be replaced with another host that was set via + * {@link #addStaticResolution(String, String)}. The value of + * hadoop.security.token.service.use_ip will determine whether the + * standard java host resolver is used, or if the fully qualified resolver + * is used. + * @param host the hostname or IP use to instantiate the object + * @param port the port number + * @return InetSocketAddress + */ + public static InetSocketAddress createSocketAddrForHost(String host, int port) { + String staticHost = getStaticResolution(host); + String resolveHost = (staticHost != null) ? staticHost : host; + + InetSocketAddress addr; + try { + InetAddress iaddr = hostResolver.getByName(resolveHost); + // if there is a static entry for the host, make the returned + // address look like the original given host + if (staticHost != null) { + iaddr = InetAddress.getByAddress(host, iaddr.getAddress()); + } + addr = new InetSocketAddress(iaddr, port); + } catch (UnknownHostException e) { + addr = InetSocketAddress.createUnresolved(host, port); + } + return addr; + } + + interface HostResolver { + InetAddress getByName(String host) throws UnknownHostException; + } + + /** + * Uses standard java host resolution + */ + static class StandardHostResolver implements HostResolver { + public InetAddress getByName(String host) throws UnknownHostException { + return InetAddress.getByName(host); + } + } + + /** + * This an alternate resolver with important properties that the standard + * java resolver lacks: + * 1) The hostname is fully qualified. This avoids security issues if not + * all hosts in the cluster do not share the same search domains. It + * also prevents other hosts from performing unnecessary dns searches. + * In contrast, InetAddress simply returns the host as given. + * 2) The InetAddress is instantiated with an exact host and IP to prevent + * further unnecessary lookups. InetAddress may perform an unnecessary + * reverse lookup for an IP. + * 3) A call to getHostName() will always return the qualified hostname, or + * more importantly, the IP if instantiated with an IP. This avoids + * unnecessary dns timeouts if the host is not resolvable. + * 4) Point 3 also ensures that if the host is re-resolved, ex. during a + * connection re-attempt, that a reverse lookup to host and forward + * lookup to IP is not performed since the reverse/forward mappings may + * not always return the same IP. If the client initiated a connection + * with an IP, then that IP is all that should ever be contacted. + * + * NOTE: this resolver is only used if: + * hadoop.security.token.service.use_ip=false + */ + protected static class QualifiedHostResolver implements HostResolver { + @SuppressWarnings("unchecked") + private List searchDomains = + ResolverConfiguration.open().searchlist(); + + /** + * Create an InetAddress with a fully qualified hostname of the given + * hostname. InetAddress does not qualify an incomplete hostname that + * is resolved via the domain search list. + * {@link InetAddress#getCanonicalHostName()} will fully qualify the + * hostname, but it always return the A record whereas the given hostname + * may be a CNAME. + * + * @param host a hostname or ip address + * @return InetAddress with the fully qualified hostname or ip + * @throws UnknownHostException if host does not exist + */ + public InetAddress getByName(String host) throws UnknownHostException { + InetAddress addr = null; + + if (IPAddressUtil.isIPv4LiteralAddress(host)) { + // use ipv4 address as-is + byte[] ip = IPAddressUtil.textToNumericFormatV4(host); + addr = InetAddress.getByAddress(host, ip); + } else if (IPAddressUtil.isIPv6LiteralAddress(host)) { + // use ipv6 address as-is + byte[] ip = IPAddressUtil.textToNumericFormatV6(host); + addr = InetAddress.getByAddress(host, ip); + } else if (host.endsWith(".")) { + // a rooted host ends with a dot, ex. "host." + // rooted hosts never use the search path, so only try an exact lookup + addr = getByExactName(host); + } else if (host.contains(".")) { + // the host contains a dot (domain), ex. "host.domain" + // try an exact host lookup, then fallback to search list + addr = getByExactName(host); + if (addr == null) { + addr = getByNameWithSearch(host); + } + } else { + // it's a simple host with no dots, ex. "host" + // try the search list, then fallback to exact host + InetAddress loopback = InetAddress.getByName(null); + if (host.equalsIgnoreCase(loopback.getHostName())) { + addr = InetAddress.getByAddress(host, loopback.getAddress()); + } else { + addr = getByNameWithSearch(host); + if (addr == null) { + addr = getByExactName(host); + } + } + } + // unresolvable! + if (addr == null) { + throw new UnknownHostException(host); + } + return addr; + } + + InetAddress getByExactName(String host) { + InetAddress addr = null; + // InetAddress will use the search list unless the host is rooted + // with a trailing dot. The trailing dot will disable any use of the + // search path in a lower level resolver. See RFC 1535. + String fqHost = host; + if (!fqHost.endsWith(".")) fqHost += "."; + try { + addr = getInetAddressByName(fqHost); + // can't leave the hostname as rooted or other parts of the system + // malfunction, ex. kerberos principals are lacking proper host + // equivalence for rooted/non-rooted hostnames + addr = InetAddress.getByAddress(host, addr.getAddress()); + } catch (UnknownHostException e) { + // ignore, caller will throw if necessary + } + return addr; + } + + InetAddress getByNameWithSearch(String host) { + InetAddress addr = null; + if (host.endsWith(".")) { // already qualified? + addr = getByExactName(host); + } else { + for (String domain : searchDomains) { + String dot = !domain.startsWith(".") ? "." : ""; + addr = getByExactName(host + dot + domain); + if (addr != null) break; + } + } + return addr; + } + + // implemented as a separate method to facilitate unit testing + InetAddress getInetAddressByName(String host) throws UnknownHostException { + return InetAddress.getByName(host); + } + + void setSearchDomains(String ... domains) { + searchDomains = Arrays.asList(domains); + } + } + + /** + * This is for testing only! + */ + @VisibleForTesting + static void setHostResolver(HostResolver newResolver) { + hostResolver = newResolver; + } + + /** + * Resolve the uri's hostname and add the default port if not in the uri + * @param uri to resolve + * @param defaultPort if none is given + * @return URI + */ + public static URI getCanonicalUri(URI uri, int defaultPort) { + // skip if there is no authority, ie. "file" scheme or relative uri + String host = uri.getHost(); + if (host == null) { + return uri; + } + String fqHost = canonicalizeHost(host); + int port = uri.getPort(); + // short out if already canonical with a port + if (host.equals(fqHost) && port != -1) { + return uri; + } + // reconstruct the uri with the canonical host and port + try { + uri = new URI(uri.getScheme(), uri.getUserInfo(), + fqHost, (port == -1) ? defaultPort : port, + uri.getPath(), uri.getQuery(), uri.getFragment()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + return uri; + } + + // cache the canonicalized hostnames; the cache currently isn't expired, + // but the canonicals will only change if the host's resolver configuration + // changes + private static final ConcurrentHashMap canonicalizedHostCache = + new ConcurrentHashMap(); + + private static String canonicalizeHost(String host) { + // check if the host has already been canonicalized + String fqHost = canonicalizedHostCache.get(host); + if (fqHost == null) { + try { + fqHost = hostResolver.getByName(host).getHostName(); + // slight race condition, but won't hurt + canonicalizedHostCache.put(host, fqHost); + } catch (UnknownHostException e) { + fqHost = host; + } + } + return fqHost; } /** @@ -279,8 +519,8 @@ public static List getAllStaticResolutions() { */ public static InetSocketAddress getConnectAddress(Server server) { InetSocketAddress addr = server.getListenerAddress(); - if (addr.getAddress().getHostAddress().equals("0.0.0.0")) { - addr = new InetSocketAddress("127.0.0.1", addr.getPort()); + if (addr.getAddress().isAnyLocalAddress()) { + addr = createSocketAddrForHost("127.0.0.1", addr.getPort()); } return addr; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java index b200ca51f5..a5e8c5d2b5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -50,6 +51,35 @@ public class SecurityUtil { public static final Log LOG = LogFactory.getLog(SecurityUtil.class); public static final String HOSTNAME_PATTERN = "_HOST"; + // controls whether buildTokenService will use an ip or host/ip as given + // by the user + private static boolean useIpForTokenService; + + static { + boolean useIp = new Configuration().getBoolean( + CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP, + CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT); + setTokenServiceUseIp(useIp); + } + + /** + * For use only by tests and initialization + */ + @InterfaceAudience.Private + static void setTokenServiceUseIp(boolean flag) { + useIpForTokenService = flag; + NetUtils.setUseQualifiedHostResolver(!flag); + } + + /** + * Intended only for temporary use by NetUtils. Do not use. + * @return whether tokens use an IP address + */ + @InterfaceAudience.Private + public static boolean getTokenServiceUseIp() { + return useIpForTokenService; + } + /** * Find the original TGT within the current subject's credentials. Cross-realm * TGT's of the form "krbtgt/TWO.COM@ONE.COM" may be present. @@ -263,29 +293,20 @@ public static void login(final Configuration conf, } /** - * create service name for Delegation token ip:port - * @param uri - * @param defPort - * @return "ip:port" + * create the service name for a Delegation token + * @param uri of the service + * @param defPort is used if the uri lacks a port + * @return the token service, or null if no authority + * @see #buildTokenService(InetSocketAddress) */ public static String buildDTServiceName(URI uri, int defPort) { - int port = uri.getPort(); - if(port == -1) - port = defPort; - - // build the service name string "/ip:port" - // for whatever reason using NetUtils.createSocketAddr(target).toString() - // returns "localhost/ip:port" - StringBuffer sb = new StringBuffer(); - String host = uri.getHost(); - if (host != null) { - host = NetUtils.normalizeHostName(host); - } else { - host = ""; + String authority = uri.getAuthority(); + if (authority == null) { + return null; } - sb.append(host).append(":").append(port); - return sb.toString(); - } + InetSocketAddress addr = NetUtils.createSocketAddr(authority, defPort); + return buildTokenService(addr).toString(); + } /** * Get the host name from the principal name of format /host@realm. @@ -367,22 +388,58 @@ public static TokenInfo getTokenInfo(Class protocol, Configuration conf) { return null; } + /** + * Decode the given token's service field into an InetAddress + * @param token from which to obtain the service + * @return InetAddress for the service + */ + public static InetSocketAddress getTokenServiceAddr(Token token) { + return NetUtils.createSocketAddr(token.getService().toString()); + } + /** * Set the given token's service to the format expected by the RPC client * @param token a delegation token * @param addr the socket for the rpc connection */ public static void setTokenService(Token token, InetSocketAddress addr) { - token.setService(buildTokenService(addr)); + Text service = buildTokenService(addr); + if (token != null) { + token.setService(service); + LOG.info("Acquired token "+token); // Token#toString() prints service + } else { + LOG.warn("Failed to get token for service "+service); + } } /** * Construct the service key for a token * @param addr InetSocketAddress of remote connection with a token - * @return "ip:port" + * @return "ip:port" or "host:port" depending on the value of + * hadoop.security.token.service.use_ip */ public static Text buildTokenService(InetSocketAddress addr) { - String host = addr.getAddress().getHostAddress(); + String host = null; + if (useIpForTokenService) { + if (addr.isUnresolved()) { // host has no ip address + throw new IllegalArgumentException( + new UnknownHostException(addr.getHostName()) + ); + } + host = addr.getAddress().getHostAddress(); + } else { + host = addr.getHostName().toLowerCase(); + } return new Text(host + ":" + addr.getPort()); } + + /** + * Construct the service key for a token + * @param uri of remote connection with a token + * @return "ip:port" or "host:port" depending on the value of + * hadoop.security.token.service.use_ip + */ + public static Text buildTokenService(URI uri) { + return buildTokenService(NetUtils.createSocketAddr(uri.getAuthority())); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCanonicalization.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCanonicalization.java new file mode 100644 index 0000000000..70c77a5161 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCanonicalization.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.net.URI; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.net.NetUtilsTestResolver; +import org.apache.hadoop.util.Progressable; +import org.junit.Test; + +public class TestFileSystemCanonicalization extends TestCase { + static String[] authorities = { + "myfs://host", + "myfs://host.a", + "myfs://host.a.b", + }; + + static String[] ips = { + "myfs://127.0.0.1" + }; + + + @Test + public void testSetupResolver() throws Exception { + NetUtilsTestResolver.install(); + } + + // no ports + + @Test + public void testShortAuthority() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host", "myfs://host.a.b:123"); + verifyPaths(fs, authorities, -1, true); + verifyPaths(fs, authorities, 123, true); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testPartialAuthority() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host.a", "myfs://host.a.b:123"); + verifyPaths(fs, authorities, -1, true); + verifyPaths(fs, authorities, 123, true); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testFullAuthority() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host.a.b", "myfs://host.a.b:123"); + verifyPaths(fs, authorities, -1, true); + verifyPaths(fs, authorities, 123, true); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + // with default ports + + @Test + public void testShortAuthorityWithDefaultPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host:123", "myfs://host.a.b:123"); + verifyPaths(fs, authorities, -1, true); + verifyPaths(fs, authorities, 123, true); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testPartialAuthorityWithDefaultPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host.a:123", "myfs://host.a.b:123"); + verifyPaths(fs, authorities, -1, true); + verifyPaths(fs, authorities, 123, true); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testFullAuthorityWithDefaultPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host.a.b:123", "myfs://host.a.b:123"); + verifyPaths(fs, authorities, -1, true); + verifyPaths(fs, authorities, 123, true); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + // with non-standard ports + + @Test + public void testShortAuthorityWithOtherPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host:456", "myfs://host.a.b:456"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, true); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testPartialAuthorityWithOtherPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host.a:456", "myfs://host.a.b:456"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, true); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testFullAuthorityWithOtherPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://host.a.b:456", "myfs://host.a.b:456"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, true); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + // ips + + @Test + public void testIpAuthority() throws Exception { + FileSystem fs = getVerifiedFS("myfs://127.0.0.1", "myfs://127.0.0.1:123"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, true); + verifyPaths(fs, ips, 123, true); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testIpAuthorityWithDefaultPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://127.0.0.1:123", "myfs://127.0.0.1:123"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, true); + verifyPaths(fs, ips, 123, true); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testIpAuthorityWithOtherPort() throws Exception { + FileSystem fs = getVerifiedFS("myfs://127.0.0.1:456", "myfs://127.0.0.1:456"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, true); + } + + // bad stuff + + @Test + public void testMismatchedSchemes() throws Exception { + FileSystem fs = getVerifiedFS("myfs2://simple", "myfs2://simple:123"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testMismatchedHosts() throws Exception { + FileSystem fs = getVerifiedFS("myfs://simple", "myfs://simple:123"); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testNullAuthority() throws Exception { + FileSystem fs = getVerifiedFS("myfs:///", "myfs:///"); + verifyPaths(fs, new String[]{ "myfs://" }, -1, true); + verifyPaths(fs, authorities, -1, false); + verifyPaths(fs, authorities, 123, false); + verifyPaths(fs, authorities, 456, false); + verifyPaths(fs, ips, -1, false); + verifyPaths(fs, ips, 123, false); + verifyPaths(fs, ips, 456, false); + } + + @Test + public void testAuthorityFromDefaultFS() throws Exception { + Configuration config = new Configuration(); + String defaultFsKey = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY; + + FileSystem fs = getVerifiedFS("myfs://host", "myfs://host.a.b:123", config); + verifyPaths(fs, new String[]{ "myfs://" }, -1, false); + + config.set(defaultFsKey, "myfs://host"); + verifyPaths(fs, new String[]{ "myfs://" }, -1, true); + + config.set(defaultFsKey, "myfs2://host"); + verifyPaths(fs, new String[]{ "myfs://" }, -1, false); + + config.set(defaultFsKey, "myfs://host:123"); + verifyPaths(fs, new String[]{ "myfs://" }, -1, true); + + config.set(defaultFsKey, "myfs://host:456"); + verifyPaths(fs, new String[]{ "myfs://" }, -1, false); + } + + FileSystem getVerifiedFS(String authority, String canonical) throws Exception { + return getVerifiedFS(authority, canonical, new Configuration()); + } + + // create a fs from the authority, then check its uri against the given uri + // and the canonical. then try to fetch paths using the canonical + FileSystem getVerifiedFS(String authority, String canonical, Configuration conf) + throws Exception { + URI uri = URI.create(authority); + URI canonicalUri = URI.create(canonical); + + FileSystem fs = new DummyFileSystem(uri, conf); + assertEquals(uri, fs.getUri()); + assertEquals(canonicalUri, fs.getCanonicalUri()); + verifyCheckPath(fs, "/file", true); + return fs; + } + + void verifyPaths(FileSystem fs, String[] uris, int port, boolean shouldPass) { + for (String uri : uris) { + if (port != -1) uri += ":"+port; + verifyCheckPath(fs, uri+"/file", shouldPass); + } + } + + void verifyCheckPath(FileSystem fs, String path, boolean shouldPass) { + Path rawPath = new Path(path); + Path fqPath = null; + Exception e = null; + try { + fqPath = fs.makeQualified(rawPath); + } catch (IllegalArgumentException iae) { + e = iae; + } + if (shouldPass) { + assertEquals(null, e); + String pathAuthority = rawPath.toUri().getAuthority(); + if (pathAuthority == null) { + pathAuthority = fs.getUri().getAuthority(); + } + assertEquals(pathAuthority, fqPath.toUri().getAuthority()); + } else { + assertNotNull("did not fail", e); + assertEquals("Wrong FS: "+rawPath+", expected: "+fs.getUri(), + e.getMessage()); + } + } + + static class DummyFileSystem extends FileSystem { + URI uri; + static int defaultPort = 123; + + DummyFileSystem(URI uri, Configuration conf) throws IOException { + this.uri = uri; + setConf(conf); + } + + @Override + public URI getUri() { + return uri; + } + + @Override + protected int getDefaultPort() { + return defaultPort; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public void setWorkingDirectory(Path new_dir) { + } + + @Override + public Path getWorkingDirectory() { + return new Path("/"); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + throw new IOException("not supposed to be here"); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + throw new IOException("not supposed to be here"); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java index 59eefca045..55484913cc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.Token; @@ -213,8 +214,7 @@ public MiniProtocol run() throws IOException { token = p.getDelegationToken(new Text(RENEWER)); currentUgi = UserGroupInformation.createUserForTesting(MINI_USER, GROUP_NAMES); - token.setService(new Text(addr.getAddress().getHostAddress() - + ":" + addr.getPort())); + SecurityUtil.setTokenService(token, addr); currentUgi.addToken(token); return p; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index 0b186a1eb1..9246fd5d72 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.net.NetUtils; @@ -286,10 +287,7 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm .getUserName())); Token token = new Token(tokenId, sm); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); - LOG.info("Service IP address for token is " + host); + SecurityUtil.setTokenService(token, addr); current.addToken(token); TestSaslProtocol proxy = null; @@ -311,14 +309,17 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm public void testPingInterval() throws Exception { Configuration newConf = new Configuration(conf); newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_1); - conf.setInt(Client.PING_INTERVAL_NAME, Client.DEFAULT_PING_INTERVAL); + conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, + CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT); + // set doPing to true - newConf.setBoolean("ipc.client.ping", true); + newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); ConnectionId remoteId = ConnectionId.getConnectionId( new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf); - assertEquals(Client.DEFAULT_PING_INTERVAL, remoteId.getPingInterval()); + assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, + remoteId.getPingInterval()); // set doPing to false - newConf.setBoolean("ipc.client.ping", false); + newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false); remoteId = ConnectionId.getConnectionId( new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf); assertEquals(0, remoteId.getPingInterval()); @@ -358,10 +359,7 @@ public void testPerConnectionConf() throws Exception { .getUserName())); Token token = new Token(tokenId, sm); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); - LOG.info("Service IP address for token is " + host); + SecurityUtil.setTokenService(token, addr); current.addToken(token); Configuration newConf = new Configuration(conf); @@ -448,10 +446,7 @@ public void testDigestAuthMethod() throws Exception { .getUserName())); Token token = new Token(tokenId, sm); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); - LOG.info("Service IP address for token is " + host); + SecurityUtil.setTokenService(token, addr); current.addToken(token); current.doAs(new PrivilegedExceptionAction() { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/NetUtilsTestResolver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/NetUtilsTestResolver.java new file mode 100644 index 0000000000..819264c8b1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/NetUtilsTestResolver.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.net; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.net.NetUtils.QualifiedHostResolver; + +/** + * provides a dummy dns search resolver with a configurable search path + * and host mapping + */ +public class NetUtilsTestResolver extends QualifiedHostResolver { + Map resolvedHosts = new HashMap(); + List hostSearches = new LinkedList(); + + public static NetUtilsTestResolver install() { + NetUtilsTestResolver resolver = new NetUtilsTestResolver(); + resolver.setSearchDomains("a.b", "b", "c"); + resolver.addResolvedHost("host.a.b.", "1.1.1.1"); + resolver.addResolvedHost("b-host.b.", "2.2.2.2"); + resolver.addResolvedHost("simple.", "3.3.3.3"); + NetUtils.setHostResolver(resolver); + return resolver; + } + + public void addResolvedHost(String host, String ip) { + InetAddress addr; + try { + addr = InetAddress.getByName(ip); + addr = InetAddress.getByAddress(host, addr.getAddress()); + } catch (UnknownHostException e) { + throw new IllegalArgumentException("not an ip:"+ip); + } + resolvedHosts.put(host, addr); + } + + InetAddress getInetAddressByName(String host) throws UnknownHostException { + hostSearches.add(host); + if (!resolvedHosts.containsKey(host)) { + throw new UnknownHostException(host); + } + return resolvedHosts.get(host); + } + + String[] getHostSearches() { + return hostSearches.toArray(new String[0]); + } + + void reset() { + hostSearches.clear(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java index 3ddac25397..7779980020 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java @@ -17,25 +17,29 @@ */ package org.apache.hadoop.net; -import junit.framework.AssertionFailedError; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.junit.Test; - import static org.junit.Assert.*; import java.io.IOException; import java.net.BindException; +import java.net.ConnectException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.Socket; -import java.net.ConnectException; import java.net.SocketException; -import java.net.InetSocketAddress; +import java.net.URI; import java.net.UnknownHostException; import java.util.Enumeration; +import junit.framework.AssertionFailedError; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; public class TestNetUtils { @@ -248,4 +252,255 @@ private IOException verifyExceptionClass(IOException e, } return wrapped; } -} + + static NetUtilsTestResolver resolver; + static Configuration config; + + @BeforeClass + public static void setupResolver() { + resolver = NetUtilsTestResolver.install(); + } + + @Before + public void resetResolver() { + resolver.reset(); + config = new Configuration(); + } + + // getByExactName + + private void verifyGetByExactNameSearch(String host, String ... searches) { + assertNull(resolver.getByExactName(host)); + assertBetterArrayEquals(searches, resolver.getHostSearches()); + } + + @Test + public void testResolverGetByExactNameUnqualified() { + verifyGetByExactNameSearch("unknown", "unknown."); + } + + @Test + public void testResolverGetByExactNameUnqualifiedWithDomain() { + verifyGetByExactNameSearch("unknown.domain", "unknown.domain."); + } + + @Test + public void testResolverGetByExactNameQualified() { + verifyGetByExactNameSearch("unknown.", "unknown."); + } + + @Test + public void testResolverGetByExactNameQualifiedWithDomain() { + verifyGetByExactNameSearch("unknown.domain.", "unknown.domain."); + } + + // getByNameWithSearch + + private void verifyGetByNameWithSearch(String host, String ... searches) { + assertNull(resolver.getByNameWithSearch(host)); + assertBetterArrayEquals(searches, resolver.getHostSearches()); + } + + @Test + public void testResolverGetByNameWithSearchUnqualified() { + String host = "unknown"; + verifyGetByNameWithSearch(host, host+".a.b.", host+".b.", host+".c."); + } + + @Test + public void testResolverGetByNameWithSearchUnqualifiedWithDomain() { + String host = "unknown.domain"; + verifyGetByNameWithSearch(host, host+".a.b.", host+".b.", host+".c."); + } + + @Test + public void testResolverGetByNameWithSearchQualified() { + String host = "unknown."; + verifyGetByNameWithSearch(host, host); + } + + @Test + public void testResolverGetByNameWithSearchQualifiedWithDomain() { + String host = "unknown.domain."; + verifyGetByNameWithSearch(host, host); + } + + // getByName + + private void verifyGetByName(String host, String ... searches) { + InetAddress addr = null; + try { + addr = resolver.getByName(host); + } catch (UnknownHostException e) {} // ignore + assertNull(addr); + assertBetterArrayEquals(searches, resolver.getHostSearches()); + } + + @Test + public void testResolverGetByNameQualified() { + String host = "unknown."; + verifyGetByName(host, host); + } + + @Test + public void testResolverGetByNameQualifiedWithDomain() { + verifyGetByName("unknown.domain.", "unknown.domain."); + } + + @Test + public void testResolverGetByNameUnqualified() { + String host = "unknown"; + verifyGetByName(host, host+".a.b.", host+".b.", host+".c.", host+"."); + } + + @Test + public void testResolverGetByNameUnqualifiedWithDomain() { + String host = "unknown.domain"; + verifyGetByName(host, host+".", host+".a.b.", host+".b.", host+".c."); + } + + // resolving of hosts + + private InetAddress verifyResolve(String host, String ... searches) { + InetAddress addr = null; + try { + addr = resolver.getByName(host); + } catch (UnknownHostException e) {} // ignore + assertNotNull(addr); + assertBetterArrayEquals(searches, resolver.getHostSearches()); + return addr; + } + + private void + verifyInetAddress(InetAddress addr, String host, String ip) { + assertNotNull(addr); + assertEquals(host, addr.getHostName()); + assertEquals(ip, addr.getHostAddress()); + } + + @Test + public void testResolverUnqualified() { + String host = "host"; + InetAddress addr = verifyResolve(host, host+".a.b."); + verifyInetAddress(addr, "host.a.b", "1.1.1.1"); + } + + @Test + public void testResolverUnqualifiedWithDomain() { + String host = "host.a"; + InetAddress addr = verifyResolve(host, host+".", host+".a.b.", host+".b."); + verifyInetAddress(addr, "host.a.b", "1.1.1.1"); + } + + @Test + public void testResolverUnqualifedFull() { + String host = "host.a.b"; + InetAddress addr = verifyResolve(host, host+"."); + verifyInetAddress(addr, host, "1.1.1.1"); + } + + @Test + public void testResolverQualifed() { + String host = "host.a.b."; + InetAddress addr = verifyResolve(host, host); + verifyInetAddress(addr, host, "1.1.1.1"); + } + + // localhost + + @Test + public void testResolverLoopback() { + String host = "Localhost"; + InetAddress addr = verifyResolve(host); // no lookup should occur + verifyInetAddress(addr, "Localhost", "127.0.0.1"); + } + + @Test + public void testResolverIP() { + String host = "1.1.1.1"; + InetAddress addr = verifyResolve(host); // no lookup should occur for ips + verifyInetAddress(addr, host, host); + } + + // + + @Test + public void testCanonicalUriWithPort() { + URI uri; + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123"), 456); + assertEquals("scheme://host.a.b:123", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123/"), 456); + assertEquals("scheme://host.a.b:123/", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123/path"), 456); + assertEquals("scheme://host.a.b:123/path", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123/path?q#frag"), 456); + assertEquals("scheme://host.a.b:123/path?q#frag", uri.toString()); + } + + @Test + public void testCanonicalUriWithDefaultPort() { + URI uri; + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host"), 123); + assertEquals("scheme://host.a.b:123", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host/"), 123); + assertEquals("scheme://host.a.b:123/", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host/path"), 123); + assertEquals("scheme://host.a.b:123/path", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme://host/path?q#frag"), 123); + assertEquals("scheme://host.a.b:123/path?q#frag", uri.toString()); + } + + @Test + public void testCanonicalUriWithPath() { + URI uri; + + uri = NetUtils.getCanonicalUri(URI.create("path"), 2); + assertEquals("path", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("/path"), 2); + assertEquals("/path", uri.toString()); + } + + @Test + public void testCanonicalUriWithNoAuthority() { + URI uri; + + uri = NetUtils.getCanonicalUri(URI.create("scheme:/"), 2); + assertEquals("scheme:/", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme:/path"), 2); + assertEquals("scheme:/path", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme:///"), 2); + assertEquals("scheme:///", uri.toString()); + + uri = NetUtils.getCanonicalUri(URI.create("scheme:///path"), 2); + assertEquals("scheme:///path", uri.toString()); + } + + @Test + public void testCanonicalUriWithNoHost() { + URI uri = NetUtils.getCanonicalUri(URI.create("scheme://:123/path"), 2); + assertEquals("scheme://:123/path", uri.toString()); + } + + @Test + public void testCanonicalUriWithNoPortNoDefaultPort() { + URI uri = NetUtils.getCanonicalUri(URI.create("scheme://host/path"), -1); + assertEquals("scheme://host.a.b/path", uri.toString()); + } + + private void assertBetterArrayEquals(T[] expect, T[]got) { + String expectStr = StringUtils.join(expect, ", "); + String gotStr = StringUtils.join(got, ", "); + assertEquals(expectStr, gotStr); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index 5f60a7d3ac..121877bf58 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -418,9 +418,7 @@ public void testProxyWithToken() throws Exception { .getUserName()), new Text("SomeSuperUser")); Token token = new Token(tokenId, sm); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); + SecurityUtil.setTokenService(token, addr); UserGroupInformation proxyUserUgi = UserGroupInformation .createProxyUserForTesting(PROXY_USER_NAME, current, GROUP_NAMES); proxyUserUgi.addToken(token); @@ -476,9 +474,7 @@ public void testTokenBySuperUser() throws Exception { .getUserName()), new Text("SomeSuperUser")); Token token = new Token(tokenId, sm); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); + SecurityUtil.setTokenService(token, addr); current.addToken(token); String retVal = current.doAs(new PrivilegedExceptionAction() { @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java index e3358e9fd4..cac0160cbd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java @@ -16,16 +16,19 @@ */ package org.apache.hadoop.security; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; import javax.security.auth.kerberos.KerberosPrincipal; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.junit.Test; import org.mockito.Mockito; @@ -121,4 +124,213 @@ public void testGetHostFromPrincipal() { assertEquals(null, SecurityUtil.getHostFromPrincipal("service@realm")); } + + @Test + public void testBuildDTServiceName() { + assertEquals("127.0.0.1:123", + SecurityUtil.buildDTServiceName(URI.create("test://LocalHost"), 123) + ); + assertEquals("127.0.0.1:123", + SecurityUtil.buildDTServiceName(URI.create("test://LocalHost:123"), 456) + ); + assertEquals("127.0.0.1:123", + SecurityUtil.buildDTServiceName(URI.create("test://127.0.0.1"), 123) + ); + assertEquals("127.0.0.1:123", + SecurityUtil.buildDTServiceName(URI.create("test://127.0.0.1:123"), 456) + ); + } + + @Test + public void testBuildTokenServiceSockAddr() { + assertEquals("127.0.0.1:123", + SecurityUtil.buildTokenService(new InetSocketAddress("LocalHost", 123)).toString() + ); + assertEquals("127.0.0.1:123", + SecurityUtil.buildTokenService(new InetSocketAddress("127.0.0.1", 123)).toString() + ); + // what goes in, comes out + assertEquals("127.0.0.1:123", + SecurityUtil.buildTokenService(NetUtils.createSocketAddr("127.0.0.1", 123)).toString() + ); + } + + @Test + public void testGoodHostsAndPorts() { + InetSocketAddress compare = NetUtils.createSocketAddrForHost("localhost", 123); + runGoodCases(compare, "localhost", 123); + runGoodCases(compare, "localhost:", 123); + runGoodCases(compare, "localhost:123", 456); + } + + void runGoodCases(InetSocketAddress addr, String host, int port) { + assertEquals(addr, NetUtils.createSocketAddr(host, port)); + assertEquals(addr, NetUtils.createSocketAddr("hdfs://"+host, port)); + assertEquals(addr, NetUtils.createSocketAddr("hdfs://"+host+"/path", port)); + } + + @Test + public void testBadHostsAndPorts() { + runBadCases("", true); + runBadCases(":", false); + runBadCases("hdfs/", false); + runBadCases("hdfs:/", false); + runBadCases("hdfs://", true); + } + + void runBadCases(String prefix, boolean validIfPosPort) { + runBadPortPermutes(prefix, false); + runBadPortPermutes(prefix+"*", false); + runBadPortPermutes(prefix+"localhost", validIfPosPort); + runBadPortPermutes(prefix+"localhost:-1", false); + runBadPortPermutes(prefix+"localhost:-123", false); + runBadPortPermutes(prefix+"localhost:xyz", false); + runBadPortPermutes(prefix+"localhost/xyz", validIfPosPort); + runBadPortPermutes(prefix+"localhost/:123", validIfPosPort); + runBadPortPermutes(prefix+":123", false); + runBadPortPermutes(prefix+":xyz", false); + } + + void runBadPortPermutes(String arg, boolean validIfPosPort) { + int ports[] = { -123, -1, 123 }; + boolean bad = false; + try { + NetUtils.createSocketAddr(arg); + } catch (IllegalArgumentException e) { + bad = true; + } finally { + assertTrue("should be bad: '"+arg+"'", bad); + } + for (int port : ports) { + if (validIfPosPort && port > 0) continue; + + bad = false; + try { + NetUtils.createSocketAddr(arg, port); + } catch (IllegalArgumentException e) { + bad = true; + } finally { + assertTrue("should be bad: '"+arg+"' (default port:"+port+")", bad); + } + } + } + + // check that the socket addr has: + // 1) the InetSocketAddress has the correct hostname, ie. exact host/ip given + // 2) the address is resolved, ie. has an ip + // 3,4) the socket's InetAddress has the same hostname, and the correct ip + // 5) the port is correct + private void + verifyValues(InetSocketAddress addr, String host, String ip, int port) { + assertTrue(!addr.isUnresolved()); + // don't know what the standard resolver will return for hostname. + // should be host for host; host or ip for ip is ambiguous + if (!SecurityUtil.getTokenServiceUseIp()) { + assertEquals(host, addr.getHostName()); + assertEquals(host, addr.getAddress().getHostName()); + } + assertEquals(ip, addr.getAddress().getHostAddress()); + assertEquals(port, addr.getPort()); + } + + // check: + // 1) buildTokenService honors use_ip setting + // 2) setTokenService & getService works + // 3) getTokenServiceAddr decodes to the identical socket addr + private void + verifyTokenService(InetSocketAddress addr, String host, String ip, int port, boolean useIp) { + //LOG.info("address:"+addr+" host:"+host+" ip:"+ip+" port:"+port); + + SecurityUtil.setTokenServiceUseIp(useIp); + String serviceHost = useIp ? ip : host.toLowerCase(); + + Token token = new Token(); + Text service = new Text(serviceHost+":"+port); + + assertEquals(service, SecurityUtil.buildTokenService(addr)); + SecurityUtil.setTokenService(token, addr); + assertEquals(service, token.getService()); + + InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); + assertNotNull(serviceAddr); + verifyValues(serviceAddr, serviceHost, ip, port); + } + + // check: + // 1) socket addr is created with fields set as expected + // 2) token service with ips + // 3) token service with the given host or ip + private void + verifyAddress(InetSocketAddress addr, String host, String ip, int port) { + verifyValues(addr, host, ip, port); + //LOG.info("test that token service uses ip"); + verifyTokenService(addr, host, ip, port, true); + //LOG.info("test that token service uses host"); + verifyTokenService(addr, host, ip, port, false); + } + + // check: + // 1-4) combinations of host and port + // this will construct a socket addr, verify all the fields, build the + // service to verify the use_ip setting is honored, set the token service + // based on addr and verify the token service is set correctly, decode + // the token service and ensure all the fields of the decoded addr match + private void verifyServiceAddr(String host, String ip) { + InetSocketAddress addr; + int port = 123; + + // test host, port tuple + //LOG.info("test tuple ("+host+","+port+")"); + addr = NetUtils.createSocketAddrForHost(host, port); + verifyAddress(addr, host, ip, port); + + // test authority with no default port + //LOG.info("test authority '"+host+":"+port+"'"); + addr = NetUtils.createSocketAddr(host+":"+port); + verifyAddress(addr, host, ip, port); + + // test authority with a default port, make sure default isn't used + //LOG.info("test authority '"+host+":"+port+"' with ignored default port"); + addr = NetUtils.createSocketAddr(host+":"+port, port+1); + verifyAddress(addr, host, ip, port); + + // test host-only authority, using port as default port + //LOG.info("test host:"+host+" port:"+port); + addr = NetUtils.createSocketAddr(host, port); + verifyAddress(addr, host, ip, port); + } + + @Test + public void testSocketAddrWithName() { + String staticHost = "my"; + NetUtils.addStaticResolution(staticHost, "localhost"); + verifyServiceAddr("LocalHost", "127.0.0.1"); + } + + @Test + public void testSocketAddrWithIP() { + verifyServiceAddr("127.0.0.1", "127.0.0.1"); + } + + @Test + public void testSocketAddrWithNameToStaticName() { + String staticHost = "host1"; + NetUtils.addStaticResolution(staticHost, "localhost"); + verifyServiceAddr(staticHost, "127.0.0.1"); + } + + @Test + public void testSocketAddrWithNameToStaticIP() { + String staticHost = "host3"; + NetUtils.addStaticResolution(staticHost, "255.255.255.255"); + verifyServiceAddr(staticHost, "255.255.255.255"); + } + + // this is a bizarre case, but it's if a test tries to remap an ip address + @Test + public void testSocketAddrWithIPToStaticIP() { + String staticHost = "1.2.3.4"; + NetUtils.addStaticResolution(staticHost, "255.255.255.255"); + verifyServiceAddr(staticHost, "255.255.255.255"); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c1e7bd92e3..0f274bd165 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -258,10 +258,13 @@ Release 0.23.1 - UNRELEASED HDFS-2335. DataNodeCluster and NNStorage always pull fresh entropy. (Uma Maheswara Rao G via eli) - HDFS-2574. Remove references to some deprecated properties in conf templates and defaults files. (Joe Crobak via harsh) + HDFS-2574. Remove references to some deprecated properties in conf + templates and defaults files. (Joe Crobak via harsh) HDFS-2722. HttpFs should not be using an int for block size. (harsh) + HDFS-2710. Add HDFS tests related to HADOOP-7933. (sid via suresh) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java index 66b8fa32a3..f2b9a23237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java @@ -20,44 +20,57 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.List; import javax.security.auth.login.LoginException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; - import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Test; public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest { private static MiniDFSCluster cluster; private static Path defaultWorkingDirectory; + private static Path defaultWorkingDirectory2; private static Configuration CONF = new Configuration(); private static FileSystem fHdfs; + private static FileSystem fHdfs2; + private FileSystem fsTarget2; + Path targetTestRoot2; @BeforeClass public static void clusterSetupAtBegining() throws IOException, LoginException, URISyntaxException { SupportsBlocks = true; - cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build(); + cluster = + new MiniDFSCluster.Builder(CONF).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(2) + .build(); cluster.waitClusterUp(); - NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads(); - fHdfs = cluster.getFileSystem(); + NameNodeAdapter.getDtSecretManager(cluster.getNamesystem(0)).startThreads(); + NameNodeAdapter.getDtSecretManager(cluster.getNamesystem(1)).startThreads(); + + fHdfs = cluster.getFileSystem(0); + fHdfs2 = cluster.getFileSystem(1); + defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" + UserGroupInformation.getCurrentUser().getShortUserName())); + defaultWorkingDirectory2 = fHdfs2.makeQualified( new Path("/user/" + + UserGroupInformation.getCurrentUser().getShortUserName())); + fHdfs.mkdirs(defaultWorkingDirectory); + fHdfs2.mkdirs(defaultWorkingDirectory2); } @@ -70,25 +83,42 @@ public static void ClusterShutdownAtEnd() throws Exception { public void setUp() throws Exception { // create the test root on local_fs fsTarget = fHdfs; + fsTarget2 = fHdfs2; + targetTestRoot2 = FileSystemTestHelper.getAbsoluteTestRootPath(fsTarget2); super.setUp(); - } @After public void tearDown() throws Exception { super.tearDown(); } - - /* - * This overides the default implementation since hdfs does have delegation - * tokens. - */ + @Override - @Test - public void testGetDelegationTokens() throws IOException { - List> delTokens = - fsView.getDelegationTokens("sanjay"); - Assert.assertEquals(7, delTokens.size()); + void setupMountPoints() { + super.setupMountPoints(); + ConfigUtil.addLink(conf, "/mountOnNn2", new Path(targetTestRoot2, + "mountOnNn2").toUri()); } + // Overriden test helper methods - changed values based on hdfs and the + // additional mount. + @Override + int getExpectedDirPaths() { + return 7; + } + + @Override + int getExpectedMountPoints() { + return 8; + } + + @Override + int getExpectedDelegationTokenCount() { + return 8; + } + + @Override + int getExpectedDelegationTokenCountWithCredentials() { + return 2; + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5e341a1909..ea291382f6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -180,6 +180,8 @@ Release 0.23.1 - Unreleased MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh) + MAPREDUCE-3478. Cannot build against ZooKeeper 3.4.0. (Tom White via mahadev) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar @@ -191,6 +193,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv) + MAPREDUCE-3569. TaskAttemptListener holds a global lock for all + task-updates. (Vinod Kumar Vavilapalli via sseth) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob @@ -395,6 +400,20 @@ Release 0.23.1 - Unreleased MAPREDUCE-1744. DistributedCache creates its own FileSytem instance when adding a file/archive to the path. (Dick King via tucu) + MAPREDUCE-3529. TokenCache does not cache viewfs credentials correctly + (sseth) + + MAPREDUCE-3595. Add missing TestCounters#testCounterValue test from branch + 1 to 0.23 (Tom White via sseth) + + MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks. + (vinodkv via acmurthy) + + MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for + performance reasons. (vinodkv via acmurthy) + + MAPREDUCE-3615. Fix some ant test failures. (Thomas Graves via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java index de77711956..6c098259a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.util.Apps; +@SuppressWarnings("deprecation") public class MapReduceChildJVM { private static String getTaskLogFile(LogName filter) { @@ -46,7 +47,7 @@ private static String getChildEnv(JobConf jobConf, boolean isMap) { jobConf.get(JobConf.MAPRED_TASK_ENV)); } return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV, - jobConf.get(jobConf.MAPRED_TASK_ENV)); + jobConf.get(JobConf.MAPRED_TASK_ENV)); } private static String getChildLogLevel(JobConf conf, boolean isMap) { @@ -68,29 +69,9 @@ public static void setVMEnv(Map environment, JobConf conf = task.conf; - // Shell - environment.put( - Environment.SHELL.name(), - conf.get( - MRJobConfig.MAPRED_ADMIN_USER_SHELL, - MRJobConfig.DEFAULT_SHELL) - ); - - // Add pwd to LD_LIBRARY_PATH, add this before adding anything else - Apps.addToEnvironment( - environment, - Environment.LD_LIBRARY_PATH.name(), - Environment.PWD.$()); - - // Add the env variables passed by the user & admin + // Add the env variables passed by the user String mapredChildEnv = getChildEnv(conf, task.isMapTask()); Apps.setEnvFromInputString(environment, mapredChildEnv); - Apps.setEnvFromInputString( - environment, - conf.get( - MRJobConfig.MAPRED_ADMIN_USER_ENV, - MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV) - ); // Set logging level in the environment. // This is so that, if the child forks another "bin/hadoop" (common in diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index ba0068098e..0e6e3eed04 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -19,14 +19,12 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,21 +62,22 @@ * This class HAS to be in this package to access package private * methods/classes. */ +@SuppressWarnings({"unchecked" , "deprecation"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { + private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true); + private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class); private AppContext context; private Server server; protected TaskHeartbeatHandler taskHeartbeatHandler; private InetSocketAddress address; - private Map jvmIDToActiveAttemptMap = - Collections.synchronizedMap(new HashMap()); + private ConcurrentMap + jvmIDToActiveAttemptMap + = new ConcurrentHashMap(); private JobTokenSecretManager jobTokenSecretManager = null; - private Set pendingJvms = - Collections.synchronizedSet(new HashSet()); public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager) { @@ -123,10 +122,9 @@ protected void startRpcServer() { server.start(); InetSocketAddress listenerAddress = server.getListenerAddress(); - this.address = - NetUtils.createSocketAddr(listenerAddress.getAddress() - .getLocalHost().getCanonicalHostName() - + ":" + listenerAddress.getPort()); + listenerAddress.getAddress(); + this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost() + .getCanonicalHostName() + ":" + listenerAddress.getPort()); } catch (IOException e) { throw new YarnException(e); } @@ -408,57 +406,59 @@ public JvmTask getTask(JvmContext context) throws IOException { WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, jvmId.getId()); - synchronized(this) { - if(pendingJvms.contains(wJvmID)) { - org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID); - if (task != null) { //there may be lag in the attempt getting added here - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - jvmTask = new JvmTask(task, false); - //remove the task as it is no more needed and free up the memory - //Also we have already told the JVM to process a task, so it is no - //longer pending, and further request should ask it to exit. - pendingJvms.remove(wJvmID); - jvmIDToActiveAttemptMap.remove(wJvmID); - } - } else { - LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); - jvmTask = new JvmTask(null, true); - } + // Try to look up the task. We remove it directly as we don't give + // multiple tasks to a JVM + org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap + .remove(wJvmID); + if (task != null) { + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + + // remove the task as it is no more needed and free up the memory + // Also we have already told the JVM to process a task, so it is no + // longer pending, and further request should ask it to exit. + } else { + LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); + jvmTask = TASK_FOR_INVALID_JVM; } return jvmTask; } @Override - public synchronized void registerPendingTask(WrappedJvmID jvmID) { - //Save this JVM away as one that has not been handled yet - pendingJvms.add(jvmID); + public void registerPendingTask( + org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { + // Create the mapping so that it is easy to look up + // when the jvm comes back to ask for Task. + + // A JVM not present in this map is an illegal task/JVM. + jvmIDToActiveAttemptMap.put(jvmID, task); } @Override public void registerLaunchedTask( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { - synchronized(this) { - //create the mapping so that it is easy to look up - //when it comes back to ask for Task. - jvmIDToActiveAttemptMap.put(jvmID, task); - //This should not need to happen here, but just to be on the safe side - if(!pendingJvms.add(jvmID)) { - LOG.warn(jvmID+" launched without first being registered"); - } - } - //register this attempt + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) { + + // The task is launched. Register this for expiry-tracking. + + // Timing can cause this to happen after the real JVM launches and gets a + // task which is still fine as we will only be tracking for expiry a little + // late than usual. taskHeartbeatHandler.register(attemptID); } @Override - public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + public void unregister( + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, WrappedJvmID jvmID) { - //remove the mapping if not already removed + + // Unregistration also comes from the same TaskAttempt which does the + // registration. Events are ordered at TaskAttempt, so unregistration will + // always come after registration. + + // remove the mapping if not already removed jvmIDToActiveAttemptMap.remove(jvmID); - //remove the pending if not already removed - pendingJvms.remove(jvmID); + //unregister this attempt taskHeartbeatHandler.unregister(attemptID); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java index b5e5cd37b2..7002e69d52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java @@ -32,20 +32,21 @@ public interface TaskAttemptListener { InetSocketAddress getAddress(); /** - * register a JVM with the listener. This should be called as soon as a + * Register a JVM with the listener. This should be called as soon as a * JVM ID is assigned to a task attempt, before it has been launched. + * @param task the task itself for this JVM. * @param jvmID The ID of the JVM . */ - void registerPendingTask(WrappedJvmID jvmID); + void registerPendingTask(Task task, WrappedJvmID jvmID); /** - * Register the task and task attempt with the JVM. This should be called - * when the JVM has been launched. - * @param attemptID the id of the attempt for this JVM. - * @param task the task itself for this JVM. - * @param jvmID the id of the JVM handling the task. + * Register task attempt. This should be called when the JVM has been + * launched. + * + * @param attemptID + * the id of the attempt for this JVM. */ - void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); + void registerLaunchedTask(TaskAttemptId attemptID); /** * Unregister the JVM and the attempt associated with it. This should be diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 52d3b47b62..4655895dbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -154,6 +156,8 @@ public abstract class TaskAttemptImpl implements private Token jobToken; private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); private static String initialClasspath = null; + private static Object commonContainerSpecLock = new Object(); + private static ContainerLaunchContext commonContainerSpec = null; private static final Object classpathLock = new Object(); private long launchTime; private long finishTime; @@ -497,29 +501,27 @@ private int getMemoryRequired(Configuration conf, TaskType taskType) { /** * Create a {@link LocalResource} record with all the given parameters. - * TODO: This should pave way for Builder pattern. */ - private static LocalResource createLocalResource(FileSystem fc, - RecordFactory recordFactory, Path file, LocalResourceType type, - LocalResourceVisibility visibility) throws IOException { + private static LocalResource createLocalResource(FileSystem fc, Path file, + LocalResourceType type, LocalResourceVisibility visibility) + throws IOException { FileStatus fstat = fc.getFileStatus(file); - LocalResource resource = - recordFactory.newRecordInstance(LocalResource.class); - resource.setResource(ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat - .getPath()))); - resource.setType(type); - resource.setVisibility(visibility); - resource.setSize(fstat.getLen()); - resource.setTimestamp(fstat.getModificationTime()); - return resource; + URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat + .getPath())); + long resourceSize = fstat.getLen(); + long resourceModificationTime = fstat.getModificationTime(); + + return BuilderUtils.newLocalResource(resourceURL, type, visibility, + resourceSize, resourceModificationTime); } /** * Lock this on initialClasspath so that there is only one fork in the AM for - * getting the initial class-path. TODO: This should go away once we construct - * a parent CLC and use it for all the containers. + * getting the initial class-path. TODO: We already construct + * a parent CLC and use it for all the containers, so this should go away + * once the mr-generated-classpath stuff is gone. */ - private String getInitialClasspath() throws IOException { + private static String getInitialClasspath() throws IOException { synchronized (classpathLock) { if (initialClasspathFlag.get()) { return initialClasspath; @@ -534,11 +536,14 @@ private String getInitialClasspath() throws IOException { /** - * Create the {@link ContainerLaunchContext} for this attempt. + * Create the common {@link ContainerLaunchContext} for all attempts. * @param applicationACLs */ - private ContainerLaunchContext createContainerLaunchContext( - Map applicationACLs) { + private static ContainerLaunchContext createCommonContainerLaunchContext( + Map applicationACLs, Configuration conf, + Token jobToken, + final org.apache.hadoop.mapred.JobID oldJobId, + Collection> fsTokens) { // Application resources Map localResources = @@ -556,13 +561,13 @@ private ContainerLaunchContext createContainerLaunchContext( FileSystem remoteFS = FileSystem.get(conf); // //////////// Set up JobJar to be localized properly on the remote NM. - if (conf.get(MRJobConfig.JAR) != null) { - Path remoteJobJar = (new Path(remoteTask.getConf().get( - MRJobConfig.JAR))).makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); + String jobJar = conf.get(MRJobConfig.JAR); + if (jobJar != null) { + Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS + .getUri(), remoteFS.getWorkingDirectory()); localResources.put( MRJobConfig.JOB_JAR, - createLocalResource(remoteFS, recordFactory, remoteJobJar, + createLocalResource(remoteFS, remoteJobJar, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-jar file on the remote FS is " + remoteJobJar.toUri().toASCIIString()); @@ -584,7 +589,7 @@ private ContainerLaunchContext createContainerLaunchContext( new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); localResources.put( MRJobConfig.JOB_CONF_FILE, - createLocalResource(remoteFS, recordFactory, remoteJobConfPath, + createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-conf file on the remote FS is " + remoteJobConfPath.toUri().toASCIIString()); @@ -630,19 +635,81 @@ private ContainerLaunchContext createContainerLaunchContext( throw new YarnException(e); } - // Setup environment - MapReduceChildJVM.setVMEnv(environment, remoteTask); + // Shell + environment.put( + Environment.SHELL.name(), + conf.get( + MRJobConfig.MAPRED_ADMIN_USER_SHELL, + MRJobConfig.DEFAULT_SHELL) + ); + + // Add pwd to LD_LIBRARY_PATH, add this before adding anything else + Apps.addToEnvironment( + environment, + Environment.LD_LIBRARY_PATH.name(), + Environment.PWD.$()); + + // Add the env variables passed by the admin + Apps.setEnvFromInputString( + environment, + conf.get( + MRJobConfig.MAPRED_ADMIN_USER_ENV, + MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV) + ); + + // Construct the actual Container + // The null fields are per-container and will be constructed for each + // container separately. + ContainerLaunchContext container = BuilderUtils + .newContainerLaunchContext(null, conf + .get(MRJobConfig.USER_NAME), null, localResources, + environment, null, serviceData, tokens, applicationACLs); + + return container; + } + + static ContainerLaunchContext createContainerLaunchContext( + Map applicationACLs, + ContainerId containerID, Configuration conf, + Token jobToken, Task remoteTask, + final org.apache.hadoop.mapred.JobID oldJobId, + Resource assignedCapability, WrappedJvmID jvmID, + TaskAttemptListener taskAttemptListener, + Collection> fsTokens) { + + synchronized (commonContainerSpecLock) { + if (commonContainerSpec == null) { + commonContainerSpec = createCommonContainerLaunchContext( + applicationACLs, conf, jobToken, oldJobId, fsTokens); + } + } + + // Fill in the fields needed per-container that are missing in the common + // spec. + + // Setup environment by cloning from common env. + Map env = commonContainerSpec.getEnvironment(); + Map myEnv = new HashMap(env.size()); + myEnv.putAll(env); + MapReduceChildJVM.setVMEnv(myEnv, remoteTask); // Set up the launch command List commands = MapReduceChildJVM.getVMCommand( - taskAttemptListener.getAddress(), remoteTask, - jvmID); - + taskAttemptListener.getAddress(), remoteTask, jvmID); + + // Duplicate the ByteBuffers for access by multiple containers. + Map myServiceData = new HashMap(); + for (Entry entry : commonContainerSpec + .getServiceData().entrySet()) { + myServiceData.put(entry.getKey(), entry.getValue().duplicate()); + } + // Construct the actual Container - ContainerLaunchContext container = BuilderUtils - .newContainerLaunchContext(containerID, conf - .get(MRJobConfig.USER_NAME), assignedCapability, localResources, - environment, commands, serviceData, tokens, applicationACLs); + ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext( + containerID, commonContainerSpec.getUser(), assignedCapability, + commonContainerSpec.getLocalResources(), myEnv, commands, + myServiceData, commonContainerSpec.getContainerTokens().duplicate(), + applicationACLs); return container; } @@ -1022,7 +1089,7 @@ public void transition(TaskAttemptImpl taskAttempt, private static class ContainerAssignedTransition implements SingleArcTransition { - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { @@ -1042,24 +1109,21 @@ public void transition(final TaskAttemptImpl taskAttempt, taskAttempt.jvmID = new WrappedJvmID( taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); - taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID); + taskAttempt.taskAttemptListener.registerPendingTask( + taskAttempt.remoteTask, taskAttempt.jvmID); //launch the container //create the container object to be launched for a given Task attempt - taskAttempt.eventHandler.handle( - new ContainerRemoteLaunchEvent(taskAttempt.attemptId, - taskAttempt.containerID, - taskAttempt.containerMgrAddress, taskAttempt.containerToken) { - @Override - public ContainerLaunchContext getContainer() { - return taskAttempt.createContainerLaunchContext(cEvent - .getApplicationACLs()); - } - @Override - public Task getRemoteTask() { // classic mapred Task, not YARN version - return taskAttempt.remoteTask; - } - }); + ContainerLaunchContext launchContext = createContainerLaunchContext( + cEvent.getApplicationACLs(), taskAttempt.containerID, + taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, + taskAttempt.oldJobId, taskAttempt.assignedCapability, + taskAttempt.jvmID, taskAttempt.taskAttemptListener, + taskAttempt.fsTokens); + taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent( + taskAttempt.attemptId, taskAttempt.containerID, + taskAttempt.containerMgrAddress, taskAttempt.containerToken, + launchContext, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle @@ -1135,10 +1199,9 @@ public void transition(TaskAttemptImpl taskAttempt, taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); - // register it to TaskAttemptListener so that it start listening - // for it - taskAttempt.taskAttemptListener.registerLaunchedTask( - taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); + // register it to TaskAttemptListener so that it can start monitoring it. + taskAttempt.taskAttemptListener + .registerLaunchedTask(taskAttempt.attemptId); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: @@ -1197,7 +1260,6 @@ private static class TaskCleanupTransition implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { - @SuppressWarnings("deprecation") TaskAttemptContext taskContext = new TaskAttemptContextImpl(taskAttempt.conf, TypeConverter.fromYarn(taskAttempt.attemptId)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java index 59ab7f0708..466073610e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java @@ -24,17 +24,31 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerToken; -public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent { +public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent { + + private final ContainerLaunchContext container; + private final Task task; public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID, ContainerId containerID, String containerMgrAddress, - ContainerToken containerToken) { - super(taskAttemptID, containerID, containerMgrAddress, - containerToken, + ContainerToken containerToken, + ContainerLaunchContext containerLaunchContext, Task remoteTask) { + super(taskAttemptID, containerID, containerMgrAddress, containerToken, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH); + this.container = containerLaunchContext; + this.task = remoteTask; } - public abstract ContainerLaunchContext getContainer(); - public abstract Task getRemoteTask(); + public ContainerLaunchContext getContainer() { + return this.container; + } + public Task getRemoteTask() { + return this.task; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 5276276c4e..b138e9a661 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -104,10 +104,9 @@ public void init(Configuration conf) { @Override public void start() { scheduler= createSchedulerProxy(); - //LOG.info("Scheduler is " + scheduler); register(); startAllocatorThread(); - JobID id = TypeConverter.fromYarn(context.getApplicationID()); + JobID id = TypeConverter.fromYarn(this.applicationId); JobId jobId = TypeConverter.toYarn(id); job = context.getJob(jobId); super.start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 74e2c1b2a8..4adbfc6d8d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -30,18 +30,17 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; -import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -69,7 +68,7 @@ public class RMContainerAllocator extends RMContainerRequestor implements ContainerAllocator { - private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); + static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); public static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; @@ -77,7 +76,10 @@ public class RMContainerAllocator extends RMContainerRequestor private static final Priority PRIORITY_FAST_FAIL_MAP; private static final Priority PRIORITY_REDUCE; private static final Priority PRIORITY_MAP; - + + private Thread eventHandlingThread; + private volatile boolean stopEventHandling; + static { PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); PRIORITY_FAST_FAIL_MAP.setPriority(5); @@ -130,7 +132,10 @@ added to the pending and are ramped up (added to scheduled) based private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; - + + BlockingQueue eventQueue + = new LinkedBlockingQueue(); + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); } @@ -155,6 +160,40 @@ public void init(Configuration conf) { retrystartTime = System.currentTimeMillis(); } + @Override + public void start() { + this.eventHandlingThread = new Thread() { + @SuppressWarnings("unchecked") + @Override + public void run() { + + ContainerAllocatorEvent event; + + while (!stopEventHandling && !Thread.currentThread().isInterrupted()) { + try { + event = RMContainerAllocator.this.eventQueue.take(); + } catch (InterruptedException e) { + LOG.error("Returning, interrupted : " + e); + return; + } + + try { + handleEvent(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " to the ContainreAllocator", t); + // Kill the AM + eventHandler.handle(new JobEvent(getJob().getID(), + JobEventType.INTERNAL_ERROR)); + return; + } + } + } + }; + this.eventHandlingThread.start(); + super.start(); + } + @Override protected synchronized void heartbeat() throws Exception { LOG.info("Before Scheduling: " + getStat()); @@ -181,6 +220,8 @@ protected synchronized void heartbeat() throws Exception { @Override public void stop() { + this.stopEventHandling = true; + eventHandlingThread.interrupt(); super.stop(); LOG.info("Final Stats: " + getStat()); } @@ -192,10 +233,27 @@ public boolean getIsReduceStarted() { public void setIsReduceStarted(boolean reduceStarted) { this.reduceStarted = reduceStarted; } - - @SuppressWarnings("unchecked") + @Override - public synchronized void handle(ContainerAllocatorEvent event) { + public void handle(ContainerAllocatorEvent event) { + int qSize = eventQueue.size(); + if (qSize != 0 && qSize % 1000 == 0) { + LOG.info("Size of event-queue in RMContainerAllocator is " + qSize); + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue " + + "of RMContainerAllocator: " + remCapacity); + } + try { + eventQueue.put(event); + } catch (InterruptedException e) { + throw new YarnException(e); + } + } + + @SuppressWarnings({ "unchecked" }) + protected synchronized void handleEvent(ContainerAllocatorEvent event) { LOG.info("Processing the event " + event.toString()); recalculateReduceSchedule = true; if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { @@ -206,9 +264,7 @@ public synchronized void handle(ContainerAllocatorEvent event) { int minSlotMemSize = getMinContainerCapability().getMemory(); mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize; - JobID id = TypeConverter.fromYarn(applicationId); - JobId jobId = TypeConverter.toYarn(id); - eventHandler.handle(new JobHistoryEvent(jobId, + eventHandler.handle(new JobHistoryEvent(getJob().getID(), new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceReqt))); LOG.info("mapResourceReqt:"+mapResourceReqt); @@ -232,9 +288,7 @@ public synchronized void handle(ContainerAllocatorEvent event) { //round off on slotsize reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize; - JobID id = TypeConverter.fromYarn(applicationId); - JobId jobId = TypeConverter.toYarn(id); - eventHandler.handle(new JobHistoryEvent(jobId, + eventHandler.handle(new JobHistoryEvent(getJob().getID(), new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, reduceResourceReqt))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index a5756da993..f26091ac64 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.IOException; @@ -68,33 +71,47 @@ public void testGetTask() throws IOException { JVMId id = new JVMId("foo",1, true, 1); WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - - //Now register the JVM, and see - listener.registerPendingTask(wid); - result = listener.getTask(context); - assertNull(result); - + + // Verify ask after registration but before launch TaskAttemptId attemptID = mock(TaskAttemptId.class); Task task = mock(Task.class); //Now put a task with the ID - listener.registerLaunchedTask(attemptID, task, wid); + listener.registerPendingTask(task, wid); + result = listener.getTask(context); + assertNotNull(result); + assertFalse(result.shouldDie); + // Unregister for more testing. + listener.unregister(attemptID, wid); + + // Verify ask after registration and launch + //Now put a task with the ID + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptID); verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); - + // Don't unregister yet for more testing. + //Verify that if we call it again a second time we are told to die. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - + listener.unregister(attemptID, wid); + + // Verify after unregistration. + result = listener.getTask(context); + assertNotNull(result); + assertTrue(result.shouldDie); + listener.stop(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 11a8671707..f17bf6f8af 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.EnumSet; @@ -65,7 +66,9 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; +import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.Clock; @@ -88,6 +91,7 @@ * Mock MRAppMaster. Doesn't start RPC servers. * No threads are started except of the event Dispatcher thread. */ +@SuppressWarnings("unchecked") public class MRApp extends MRAppMaster { private static final Log LOG = LogFactory.getLog(MRApp.class); @@ -173,7 +177,8 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, } public Job submit(Configuration conf) throws Exception { - String user = conf.get(MRJobConfig.USER_NAME, "mapred"); + String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation + .getCurrentUser().getShortUserName()); conf.set(MRJobConfig.USER_NAME, user); conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString()); conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true); @@ -187,6 +192,14 @@ public Job submit(Configuration conf) throws Exception { start(); DefaultMetricsSystem.shutdown(); Job job = getContext().getAllJobs().values().iterator().next(); + + // Write job.xml + String jobFile = MRApps.getJobFile(conf, user, + TypeConverter.fromYarn(job.getID())); + LOG.info("Writing job conf to " + jobFile); + new File(jobFile).getParentFile().mkdirs(); + conf.writeXml(new FileOutputStream(jobFile)); + return job; } @@ -308,16 +321,16 @@ protected TaskAttemptListener createTaskAttemptListener(AppContext context) { return new TaskAttemptListener(){ @Override public InetSocketAddress getAddress() { - return null; + return NetUtils.createSocketAddr("localhost:54321"); } @Override - public void registerLaunchedTask(TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {} + public void registerLaunchedTask(TaskAttemptId attemptID) {} @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } @Override - public void registerPendingTask(WrappedJvmID jvmID) { + public void registerPendingTask(org.apache.hadoop.mapred.Task task, + WrappedJvmID jvmID) { } }; } @@ -337,12 +350,14 @@ protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher(); } - class MockContainerLauncher implements ContainerLauncher { + protected class MockContainerLauncher implements ContainerLauncher { //We are running locally so set the shuffle port to -1 int shufflePort = -1; - @SuppressWarnings("unchecked") + public MockContainerLauncher() { + } + @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { @@ -474,6 +489,7 @@ static class TestInitTransition extends JobImpl.InitTransition { } @Override protected void setup(JobImpl job) throws IOException { + super.setup(job); job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces); job.remoteJobConfFile = new Path("test"); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index e4bac1ffd5..1e9631696a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -40,6 +40,7 @@ /** * Tests the state machine of MR App. */ +@SuppressWarnings("unchecked") public class TestMRApp { @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index a4b84b2b53..db1f20fa49 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -1186,12 +1186,12 @@ public void sendRequest(ContainerRequestEvent req) { public void sendRequests(List reqs) { for (ContainerRequestEvent req : reqs) { - super.handle(req); + super.handleEvent(req); } } public void sendFailure(ContainerFailedEvent f) { - super.handle(f); + super.handleEvent(f); } // API to be used by tests diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java new file mode 100644 index 0000000000..ea0a342d62 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.impl; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.junit.Test; + +public class TestMapReduceChildJVM { + + private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class); + + @Test + public void testCommandLine() throws Exception { + + MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true); + Job job = app.submit(new Configuration()); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + + Assert.assertEquals( + "[exec $JAVA_HOME/bin/java" + + " -Djava.net.preferIPv4Stack=true" + + " -Dhadoop.metrics.log.level=WARN" + + " -Xmx200m -Djava.io.tmpdir=$PWD/tmp" + + " -Dlog4j.configuration=container-log4j.properties" + + " -Dyarn.app.mapreduce.container.log.dir=" + + " -Dyarn.app.mapreduce.container.log.filesize=0" + + " -Dhadoop.root.logger=INFO,CLA" + + " org.apache.hadoop.mapred.YarnChild 127.0.0.1" + + " 54321" + + " attempt_0_0000_m_000000_0" + + " 0" + + " 1>/stdout" + + " 2>/stderr ]", app.myCommandLine); + } + + private static final class MyMRApp extends MRApp { + + private String myCommandLine; + + public MyMRApp(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + } + + @Override + protected ContainerLauncher createContainerLauncher(AppContext context) { + return new MockContainerLauncher() { + @Override + public void handle(ContainerLauncherEvent event) { + if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { + ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; + ContainerLaunchContext launchContext = launchEvent.getContainer(); + String cmdString = launchContext.getCommands().toString(); + LOG.info("launchContext " + cmdString); + myCommandLine = cmdString; + } + super.handle(event); + } + }; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index e052b2527c..637ae1beff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; +@SuppressWarnings("unchecked") public class TestTaskAttempt{ @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index 066f0af646..d2b9354b42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -103,7 +103,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { (TaskAttemptID id, TaskType taskType, String status, long finishTime, String hostname, String error) { - this(id, taskType, status, finishTime, hostname, -1, null, error, null); + this(id, taskType, status, finishTime, hostname, -1, "", error, null); } TaskAttemptUnsuccessfulCompletionEvent() {} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java index a7d91928c4..c3a084d239 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java @@ -19,9 +19,7 @@ package org.apache.hadoop.mapreduce.security; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,9 +33,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Master; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -90,7 +86,7 @@ static void obtainTokensForNamenodesInternal(Credentials credentials, obtainTokensForNamenodesInternal(fs, credentials, conf); } } - + /** * get delegation token for a specific FS * @param fs @@ -99,6 +95,7 @@ static void obtainTokensForNamenodesInternal(Credentials credentials, * @param conf * @throws IOException */ + @SuppressWarnings("deprecation") static void obtainTokensForNamenodesInternal(FileSystem fs, Credentials credentials, Configuration conf) throws IOException { String delegTokenRenewer = Master.getMasterPrincipal(conf); @@ -131,7 +128,8 @@ static void obtainTokensForNamenodesInternal(FileSystem fs, return; } } - List> tokens = fs.getDelegationTokens(delegTokenRenewer); + List> tokens = + fs.getDelegationTokens(delegTokenRenewer, credentials); if (tokens != null) { for (Token token : tokens) { credentials.addToken(token.getService(), token); @@ -141,13 +139,13 @@ static void obtainTokensForNamenodesInternal(FileSystem fs, } //Call getDelegationToken as well for now - for FS implementations // which may not have implmented getDelegationTokens (hftp) - Token token = fs.getDelegationToken(delegTokenRenewer); - if (token != null) { - Text fsNameText = new Text(fsName); - token.setService(fsNameText); - credentials.addToken(fsNameText, token); - LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + - ";t.service="+token.getService()); + if (tokens == null || tokens.size() == 0) { + Token token = fs.getDelegationToken(delegTokenRenewer); + if (token != null) { + credentials.addToken(token.getService(), token); + LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName + + ";t.service=" + token.getService()); + } } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java index bdd5bed4f3..5df6ede9cb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.text.ParseException; +import java.util.Random; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapreduce.FileSystemCounter; @@ -98,6 +99,37 @@ public void testCounters() throws IOException { } } + /** + * Verify counter value works + */ + @SuppressWarnings("deprecation") + @Test + public void testCounterValue() { + Counters counters = new Counters(); + final int NUMBER_TESTS = 100; + final int NUMBER_INC = 10; + final Random rand = new Random(); + for (int i = 0; i < NUMBER_TESTS; i++) { + long initValue = rand.nextInt(); + long expectedValue = initValue; + Counter counter = counters.findCounter("foo", "bar"); + counter.setValue(initValue); + assertEquals("Counter value is not initialized correctly", + expectedValue, counter.getValue()); + for (int j = 0; j < NUMBER_INC; j++) { + int incValue = rand.nextInt(); + counter.increment(incValue); + expectedValue += incValue; + assertEquals("Counter value is not incremented correctly", + expectedValue, counter.getValue()); + } + expectedValue = rand.nextInt(); + counter.setValue(expectedValue); + assertEquals("Counter value is not set correctly", + expectedValue, counter.getValue()); + } + } + @SuppressWarnings("deprecation") @Test public void testLegacyNames() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java new file mode 100644 index 0000000000..5d2101b229 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.security; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.URI; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Master; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestTokenCache { + + @Test + @SuppressWarnings("deprecation") + public void testGetDelegationTokensNotImplemented() throws Exception { + Credentials credentials = new Credentials(); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); + String renewer = Master.getMasterPrincipal(conf); + + FileSystem fs = setupSingleFsWithoutGetDelegationTokens(); + TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf); + assertEquals(1, credentials.getAllTokens().size()); + + verify(fs).getDelegationTokens(renewer, credentials); + verify(fs).getDelegationToken(renewer); + } + + @Test + public void testManagedFileSystem() throws Exception { + Credentials credentials = new Credentials(); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); + String renewer = Master.getMasterPrincipal(conf); + + FileSystem singleFs = setupSingleFs(); + FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials); + + TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf); + assertEquals(1, credentials.getAllTokens().size()); + + TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf); + assertEquals(1, credentials.getAllTokens().size()); + + TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf); + assertEquals(2, credentials.getAllTokens().size()); + + TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf); + assertEquals(2, credentials.getAllTokens().size()); + + verify(singleFs, times(1)).getDelegationTokens(renewer, credentials); + verify(multiFs, times(2)).getDelegationTokens(renewer, credentials); + // A call to getDelegationToken would have generated an exception. + } + + @SuppressWarnings("deprecation") + private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception { + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4"); + when(mockFs.getUri()).thenReturn(new URI("singlefs4:///")); + + final Token mockToken = (Token) mock(Token.class); + when(mockToken.getService()).thenReturn(new Text("singlefs4")); + + when(mockFs.getDelegationToken(any(String.class))).thenAnswer( + new Answer>() { + @Override + public Token answer(InvocationOnMock invocation) throws Throwable { + return mockToken; + } + }); + + when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class))) + .thenReturn(new LinkedList>()); + + return mockFs; + } + + private FileSystem setupSingleFs() throws Exception { + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1"); + when(mockFs.getUri()).thenReturn(new URI("singlefs1:///")); + + List> tokens = new LinkedList>(); + Token mockToken = mock(Token.class); + when(mockToken.getService()).thenReturn(new Text("singlefs1")); + tokens.add(mockToken); + + when(mockFs.getDelegationTokens(any(String.class))).thenThrow( + new RuntimeException( + "getDelegationTokens(renewer) should not be called")); + when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class))) + .thenReturn(tokens); + + return mockFs; + } + + private FileSystem setupMultiFs(final FileSystem singleFs, + final String renewer, final Credentials credentials) throws Exception { + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.getCanonicalServiceName()).thenReturn("multifs"); + when(mockFs.getUri()).thenReturn(new URI("multifs:///")); + + when(mockFs.getDelegationTokens(any(String.class))).thenThrow( + new RuntimeException( + "getDelegationTokens(renewer) should not be called")); + when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer( + new Answer>>() { + + @Override + public List> answer(InvocationOnMock invocation) + throws Throwable { + List> newTokens = new LinkedList>(); + if (credentials.getToken(new Text("singlefs1")) == null) { + newTokens.addAll(singleFs.getDelegationTokens(renewer, + credentials)); + } else { + newTokens.add(credentials.getToken(new Text("singlefs1"))); + } + Token mockToken2 = mock(Token.class); + when(mockToken2.getService()).thenReturn(new Text("singlefs2")); + newTokens.add(mockToken2); + return newTokens; + } + }); + + return mockFs; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/lib/TestZKClient.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/lib/TestZKClient.java index 586533ec2a..b857bf309b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/lib/TestZKClient.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/lib/TestZKClient.java @@ -29,7 +29,7 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.lib.ZKClient; -import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnLog; @@ -45,7 +45,8 @@ public class TestZKClient { protected String hostPort = "127.0.0.1:2000"; protected int maxCnxns = 0; - protected NIOServerCnxn.Factory factory = null; + protected NIOServerCnxnFactory factory = null; + protected ZooKeeperServer zks; protected File tmpDir = null; public static String send4LetterWord(String host, int port, String cmd) @@ -144,10 +145,11 @@ public void setUp() throws IOException, InterruptedException { BASETEST.mkdirs(); } File dataDir = createTmpDir(BASETEST); - ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000); + zks = new ZooKeeperServer(dataDir, dataDir, 3000); final int PORT = Integer.parseInt(hostPort.split(":")[1]); if (factory == null) { - factory = new NIOServerCnxn.Factory(new InetSocketAddress(PORT),maxCnxns); + factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress(PORT), maxCnxns); } factory.startup(zks); Assert.assertTrue("waiting for server up", @@ -158,8 +160,8 @@ public void setUp() throws IOException, InterruptedException { @After public void tearDown() throws IOException, InterruptedException { - if (factory != null) { - ZKDatabase zkDb = factory.getZooKeeperServer().getZKDatabase(); + if (zks != null) { + ZKDatabase zkDb = zks.getZKDatabase(); factory.shutdown(); try { zkDb.close(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/pom.xml index 2c9ab3d970..df178ed197 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/pom.xml @@ -310,7 +310,7 @@ org.apache.zookeeper zookeeper - 3.3.1 + 3.4.2 diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java index 791b92c82f..1bac8de3fe 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java @@ -3210,7 +3210,7 @@ private void failedTask(TaskInProgress tip, TaskAttemptID taskid, (taskid, taskType, taskStatus.getRunState().toString(), finishTime, - taskTrackerHostName, -1, null, diagInfo, + taskTrackerHostName, -1, "", diagInfo, splits.burst()); jobHistory.logEvent(tue, taskid.getJobID());