diff --git a/hadoop-common-project/dev-support/test-patch.properties b/hadoop-common-project/dev-support/test-patch.properties index 15b54bfcf0..c33b2a9440 100644 --- a/hadoop-common-project/dev-support/test-patch.properties +++ b/hadoop-common-project/dev-support/test-patch.properties @@ -18,4 +18,4 @@ OK_RELEASEAUDIT_WARNINGS=0 OK_FINDBUGS_WARNINGS=0 -OK_JAVADOC_WARNINGS=6 +OK_JAVADOC_WARNINGS=13 diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b34432fc55..30fa5f25da 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -74,7 +74,11 @@ Trunk (unreleased changes) HADOOP-7987. Support setting the run-as user in unsecure mode. (jitendra) + HADOOP-7965. Support for protocol version and signature in PB. (jitendra) + BUGS + HADOOP-7998. CheckFileSystem does not correctly honor setVerifyChecksum + (Daryn Sharp via bobby) HADOOP-7851. Configuration.getClasses() never returns the default value. (Uma Maheswara Rao G via amarrk) @@ -209,6 +213,8 @@ Release 0.23.1 - Unreleased HADOOP-7919. Remove the unused hadoop.logfile.* properties from the core-default.xml file. (harsh) + HADOOP-7939. Improve Hadoop subcomponent integration in Hadoop 0.23. (rvs via tucu) + OPTIMIZATIONS BUG FIXES @@ -290,6 +296,15 @@ Release 0.23.1 - Unreleased HADOOP-7981. Improve documentation for org.apache.hadoop.io.compress. Decompressor.getRemaining (Jonathan Eagles via mahadev) + HADOOP-7997. SequenceFile.createWriter(...createParent...) no + longer works on existing file. (Gregory Chanan via eli) + + HADOOP-7993. Hadoop ignores old-style config options for enabling compressed + output. (Anupam Seth via mahadev) + + HADOOP-8000. fetchdt command not available in bin/hadoop. + (Arpit Gupta via mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index 190677248e..115d7124ef 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -274,4 +274,8 @@ + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop index e01f205178..a121f3c268 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop @@ -51,7 +51,7 @@ fi COMMAND=$1 case $COMMAND in #hdfs commands - namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer) + namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|fetchdt) echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." 1>&2 echo "Instead use the hdfs command for it." 1>&2 echo "" 1>&2 diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh index 71c9481714..c8ecc42664 100644 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh @@ -25,9 +25,21 @@ common_bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P) script="$(basename -- "$this")" this="$common_bin/$script" +[ -f "$common_bin/hadoop-layout.sh" ] && . "$common_bin/hadoop-layout.sh" + +HADOOP_COMMON_DIR=${HADOOP_COMMON_DIR:-"share/hadoop/common"} +HADOOP_COMMON_LIB_JARS_DIR=${HADOOP_COMMON_LIB_JARS_DIR:-"share/hadoop/common/lib"} +HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_COMMON_LIB_NATIVE_DIR:-"lib/native"} +HDFS_DIR=${HDFS_DIR:-"share/hadoop/hdfs"} +HDFS_LIB_JARS_DIR=${HDFS_LIB_JARS_DIR:-"share/hadoop/hdfs/lib"} +YARN_DIR=${YARN_DIR:-"share/hadoop/mapreduce"} +YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"} +MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"} +MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"} + # the root of the Hadoop installation # See HADOOP-6255 for directory structure layout -HADOOP_DEFAULT_PREFIX=`dirname "$this"`/.. +HADOOP_DEFAULT_PREFIX=$(cd -P -- "$common_bin"/.. && pwd -P) HADOOP_PREFIX=${HADOOP_PREFIX:-$HADOOP_DEFAULT_PREFIX} export HADOOP_PREFIX @@ -144,16 +156,22 @@ CLASSPATH="${HADOOP_CONF_DIR}" # so that filenames w/ spaces are handled correctly in loops below IFS= +if [ "$HADOOP_COMMON_HOME" = "" ]; then + if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_DIR" ]; then + HADOOP_COMMON_HOME=$HADOOP_PREFIX + fi +fi + # for releases, add core hadoop jar & webapps to CLASSPATH -if [ -d "$HADOOP_PREFIX/share/hadoop/common/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common/webapps +if [ -d "$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR/webapps" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR fi -if [ -d "$HADOOP_PREFIX/share/hadoop/common/lib" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common/lib'/*' +if [ -d "$HADOOP_COMMON_HOME/$HADOOP_COMMON_LIB_JARS_DIR" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_LIB_JARS_DIR'/*' fi -CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common'/*' +CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR'/*' # add user-specified CLASSPATH last if [ "$HADOOP_CLASSPATH" != "" ]; then @@ -185,13 +203,13 @@ fi # setup 'java.library.path' for native-hadoop code if necessary -if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/lib/native" ]; then +if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR" ]; then - if [ -d "${HADOOP_PREFIX}/lib/native" ]; then + if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR" ]; then if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then - JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/lib/native + JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR else - JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib/native + JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR fi fi fi @@ -216,37 +234,56 @@ HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true" # put hdfs in classpath if present if [ "$HADOOP_HDFS_HOME" = "" ]; then - if [ -d "${HADOOP_PREFIX}/share/hadoop/hdfs" ]; then + if [ -d "${HADOOP_PREFIX}/$HDFS_DIR" ]; then HADOOP_HDFS_HOME=$HADOOP_PREFIX fi fi -if [ -d "$HADOOP_HDFS_HOME/share/hadoop/hdfs/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs +if [ -d "$HADOOP_HDFS_HOME/$HDFS_DIR/webapps" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR fi -if [ -d "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib'/*' +if [ -d "$HADOOP_HDFS_HOME/$HDFS_LIB_JARS_DIR" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_LIB_JARS_DIR'/*' fi -CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs'/*' +CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR'/*' # put yarn in classpath if present if [ "$YARN_HOME" = "" ]; then - if [ -d "${HADOOP_PREFIX}/share/hadoop/mapreduce" ]; then + if [ -d "${HADOOP_PREFIX}/$YARN_DIR" ]; then YARN_HOME=$HADOOP_PREFIX fi fi -if [ -d "$YARN_HOME/share/hadoop/mapreduce/webapps" ]; then - CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce +if [ -d "$YARN_HOME/$YARN_DIR/webapps" ]; then + CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR fi -if [ -d "$YARN_HOME/share/hadoop/mapreduce/lib" ]; then - CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib'/*' +if [ -d "$YARN_HOME/$YARN_LIB_JARS_DIR" ]; then + CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_LIB_JARS_DIR'/*' fi -CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce'/*' +CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR'/*' + +# put mapred in classpath if present AND different from YARN +if [ "$HADOOP_MAPRED_HOME" = "" ]; then + if [ -d "${HADOOP_PREFIX}/$MAPRED_DIR" ]; then + HADOOP_MAPRED_HOME=$HADOOP_PREFIX + fi +fi + +if [ "$HADOOP_MAPRED_HOME/$MAPRED_DIR" != "$YARN_HOME/$YARN_DIR" ] ; then + if [ -d "$HADOOP_MAPRED_HOME/$MAPRED_DIR/webapps" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR + fi + + if [ -d "$HADOOP_MAPRED_HOME/$MAPRED_LIB_JARS_DIR" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_LIB_JARS_DIR'/*' + fi + + CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR'/*' +fi # cygwin path translation if $cygwin; then diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java index 79e0793253..84a3187119 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java @@ -345,7 +345,17 @@ public class Configuration implements Iterable>, } return name; } - + + private void handleDeprecation() { + LOG.debug("Handling deprecation for all properties in config..."); + Set keys = new HashSet(); + keys.addAll(getProps().keySet()); + for (Object item: keys) { + LOG.debug("Handling deprecation for " + (String)item); + handleDeprecation((String)item); + } + } + static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); @@ -1667,7 +1677,7 @@ public class Configuration implements Iterable>, Element conf = doc.createElement("configuration"); doc.appendChild(conf); conf.appendChild(doc.createTextNode("\n")); - getProps(); // ensure properties is set + handleDeprecation(); //ensure properties is set and deprecation is handled for (Enumeration e = properties.keys(); e.hasMoreElements();) { String name = (String)e.nextElement(); Object object = properties.get(name); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index f24c3924ca..de1178930f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -304,8 +304,9 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { */ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return new FSDataInputStream( - new ChecksumFSInputChecker(this, f, bufferSize)); + return verifyChecksum + ? new FSDataInputStream(new ChecksumFSInputChecker(this, f, bufferSize)) + : getRawFileSystem().open(f, bufferSize); } /** {@inheritDoc} */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index a64bd1bf9e..476eaeb14b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -467,7 +467,7 @@ public class SequenceFile { Metadata metadata) throws IOException { return createWriter(FileContext.getFileContext(fs.getUri(), conf), conf, name, keyClass, valClass, compressionType, codec, - metadata, EnumSet.of(CreateFlag.CREATE), + metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), CreateOpts.bufferSize(bufferSize), createParent ? CreateOpts.createParent() : CreateOpts.donotCreateParent(), diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index f422960dc7..3f94abfbb2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.io.retry; -import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Collections; import java.util.Map; @@ -29,8 +28,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.util.ThreadUtil; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RpcInvocationHandler; -class RetryInvocationHandler implements InvocationHandler, Closeable { +class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private FailoverProxyProvider proxyProvider; @@ -160,4 +161,11 @@ class RetryInvocationHandler implements InvocationHandler, Closeable { proxyProvider.close(); } + @Override //RpcInvocationHandler + public ConnectionId getConnectionId() { + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(currentProxy); + return inv.getConnectionId(); + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index e0f921fd96..9b3862abdb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -18,11 +18,9 @@ package org.apache.hadoop.ipc; -import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; @@ -37,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto; @@ -51,7 +50,6 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -80,8 +78,19 @@ public class ProtobufRpcEngine implements RpcEngine { .getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)), false); } + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + Class protocol = ProtocolMetaInfoPB.class; + return new ProtocolProxy(protocol, + (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new Invoker(protocol, connId, conf, + factory)), false); + } - private static class Invoker implements InvocationHandler, Closeable { + private static class Invoker implements RpcInvocationHandler { private final Map returnTypes = new ConcurrentHashMap(); private boolean isClosed = false; @@ -93,12 +102,20 @@ public class ProtobufRpcEngine implements RpcEngine { public Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf); - this.client = CLIENTS.getClient(conf, factory, - RpcResponseWritable.class); - this.clientProtocolVersion = RPC.getProtocolVersion(protocol); + this(protocol, Client.ConnectionId.getConnectionId(addr, protocol, + ticket, rpcTimeout, conf), conf, factory); + } + + /** + * This constructor takes a connectionId, instead of creating a new one. + */ + public Invoker(Class protocol, Client.ConnectionId connId, + Configuration conf, SocketFactory factory) { + this.remoteId = connId; + this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class); this.protocolName = RPC.getProtocolName(protocol); + this.clientProtocolVersion = RPC + .getProtocolVersion(protocol); } private HadoopRpcRequestProto constructRpcRequest(Method method, @@ -222,6 +239,11 @@ public class ProtobufRpcEngine implements RpcEngine { returnTypes.put(method.getName(), prototype); return prototype; } + + @Override //RpcInvocationHandler + public ConnectionId getConnectionId() { + return remoteId; + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java new file mode 100644 index 0000000000..968f3d0231 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; + +/** + * Protocol to get versions and signatures for supported protocols from the + * server. + * + * Note: This extends the protocolbuffer service based interface to + * add annotations. + */ +@ProtocolInfo( + protocolName = "org.apache.hadoop.ipc.ProtocolMetaInfoPB", + protocolVersion = 1) +public interface ProtocolMetaInfoPB extends + ProtocolInfoService.BlockingInterface { +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java new file mode 100644 index 0000000000..aaf71f8a4e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolVersionProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class serves the requests for protocol versions and signatures by + * looking them up in the server registry. + */ +public class ProtocolMetaInfoServerSideTranslatorPB implements + ProtocolMetaInfoPB { + + RPC.Server server; + + public ProtocolMetaInfoServerSideTranslatorPB(RPC.Server server) { + this.server = server; + } + + @Override + public GetProtocolVersionsResponseProto getProtocolVersions( + RpcController controller, GetProtocolVersionsRequestProto request) + throws ServiceException { + String protocol = request.getProtocol(); + GetProtocolVersionsResponseProto.Builder builder = + GetProtocolVersionsResponseProto.newBuilder(); + for (RpcKind r : RpcKind.values()) { + long[] versions; + try { + versions = getProtocolVersionForRpcKind(r, protocol); + } catch (ClassNotFoundException e) { + throw new ServiceException(e); + } + ProtocolVersionProto.Builder b = ProtocolVersionProto.newBuilder(); + if (versions != null) { + b.setRpcKind(r.toString()); + for (long v : versions) { + b.addVersions(v); + } + } + builder.addProtocolVersions(b.build()); + } + return builder.build(); + } + + @Override + public GetProtocolSignatureResponseProto getProtocolSignature( + RpcController controller, GetProtocolSignatureRequestProto request) + throws ServiceException { + GetProtocolSignatureResponseProto.Builder builder = GetProtocolSignatureResponseProto + .newBuilder(); + String protocol = request.getProtocol(); + String rpcKind = request.getRpcKind(); + long[] versions; + try { + versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind), + protocol); + } catch (ClassNotFoundException e1) { + throw new ServiceException(e1); + } + if (versions == null) { + return builder.build(); + } + for (long v : versions) { + ProtocolSignatureProto.Builder sigBuilder = ProtocolSignatureProto + .newBuilder(); + sigBuilder.setVersion(v); + try { + ProtocolSignature signature = ProtocolSignature.getProtocolSignature( + protocol, v); + for (int m : signature.getMethods()) { + sigBuilder.addMethods(m); + } + } catch (ClassNotFoundException e) { + throw new ServiceException(e); + } + builder.addProtocolSignature(sigBuilder.build()); + } + return builder.build(); + } + + private long[] getProtocolVersionForRpcKind(RpcKind rpcKind, + String protocol) throws ClassNotFoundException { + Class protocolClass = Class.forName(protocol); + String protocolName = RPC.getProtocolName(protocolClass); + VerProtocolImpl[] vers = server.getSupportedProtocolVersions(rpcKind, + protocolName); + if (vers == null) { + return null; + } + long [] versions = new long[vers.length]; + for (int i=0; i protocol, long serverVersion) { + Class protocol, long serverVersion) { String protocolName = RPC.getProtocolName(protocol); synchronized (PROTOCOL_FINGERPRINT_CACHE) { ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName); @@ -221,6 +221,12 @@ public class ProtocolSignature implements Writable { return sig.signature; } + public static ProtocolSignature getProtocolSignature(String protocolName, + long version) throws ClassNotFoundException { + Class protocol = Class.forName(protocolName); + return getSigFingerprint(protocol, version).signature; + } + /** * Get a server protocol's signature * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 321c1d8ef3..4f85e905cd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -41,6 +41,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +50,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.ReflectionUtils; +import com.google.protobuf.BlockingService; + /** A simple RPC mechanism. * * A protocol is a Java interface. All parameters and return types must @@ -177,8 +180,8 @@ public class RPC { } // return the RpcEngine configured to handle a protocol - private static synchronized RpcEngine getProtocolEngine(Class protocol, - Configuration conf) { + static synchronized RpcEngine getProtocolEngine(Class protocol, + Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), @@ -522,7 +525,16 @@ public class RPC { return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); } - + + /** + * Returns the server address for a given proxy. + */ + public static InetSocketAddress getServerAddress(Object proxy) { + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(proxy); + return inv.getConnectionId().getAddress(); + } + /** * Get a protocol proxy that contains a proxy connection to a remote server * and a set of methods that are supported by the server @@ -817,6 +829,19 @@ public class RPC { SecretManager secretManager) throws IOException { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, conf, serverName, secretManager); + initProtocolMetaInfo(conf); + } + + private void initProtocolMetaInfo(Configuration conf) + throws IOException { + RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class, + ProtobufRpcEngine.class); + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(this); + BlockingService protocolInfoBlockingService = ProtocolInfoService + .newReflectiveBlockingService(xlator); + addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class, + protocolInfoBlockingService); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java new file mode 100644 index 0000000000..cdbc034ea2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.net.NetUtils; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class maintains a cache of protocol versions and corresponding protocol + * signatures, keyed by server address, protocol and rpc kind. + * The cache is lazily populated. + */ +public class RpcClientUtil { + private static RpcController NULL_CONTROLLER = null; + private static final int PRIME = 16777619; + + private static class ProtoSigCacheKey { + private InetSocketAddress serverAddress; + private String protocol; + private String rpcKind; + + ProtoSigCacheKey(InetSocketAddress addr, String p, String rk) { + this.serverAddress = addr; + this.protocol = p; + this.rpcKind = rk; + } + + @Override //Object + public int hashCode() { + int result = 1; + result = PRIME * result + + ((serverAddress == null) ? 0 : serverAddress.hashCode()); + result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode()); + result = PRIME * result + ((rpcKind == null) ? 0 : rpcKind.hashCode()); + return result; + } + + @Override //Object + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof ProtoSigCacheKey) { + ProtoSigCacheKey otherKey = (ProtoSigCacheKey) other; + return (serverAddress.equals(otherKey.serverAddress) && + protocol.equals(otherKey.protocol) && + rpcKind.equals(otherKey.rpcKind)); + } + return false; + } + } + + private static ConcurrentHashMap> + signatureMap = new ConcurrentHashMap>(); + + private static void putVersionSignatureMap(InetSocketAddress addr, + String protocol, String rpcKind, Map map) { + signatureMap.put(new ProtoSigCacheKey(addr, protocol, rpcKind), map); + } + + private static Map getVersionSignatureMap( + InetSocketAddress addr, String protocol, String rpcKind) { + return signatureMap.get(new ProtoSigCacheKey(addr, protocol, rpcKind)); + } + + /** + * Returns whether the given method is supported or not. + * The protocol signatures are fetched and cached. The connection id for the + * proxy provided is re-used. + * @param rpcProxy Proxy which provides an existing connection id. + * @param protocol Protocol for which the method check is required. + * @param rpcKind The RpcKind for which the method check is required. + * @param version The version at the client. + * @param methodName Name of the method. + * @return true if the method is supported, false otherwise. + * @throws IOException + */ + public static boolean isMethodSupported(Object rpcProxy, Class protocol, + RpcKind rpcKind, long version, String methodName) throws IOException { + InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy); + Map versionMap = getVersionSignatureMap( + serverAddress, protocol.getName(), rpcKind.toString()); + + if (versionMap == null) { + Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class, + ProtobufRpcEngine.class); + ProtocolMetaInfoPB protocolInfoProxy = getProtocolMetaInfoProxy(rpcProxy, + conf); + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + GetProtocolSignatureResponseProto resp; + try { + resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER, + builder.build()); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + versionMap = convertProtocolSignatureProtos(resp + .getProtocolSignatureList()); + putVersionSignatureMap(serverAddress, protocol.getName(), + rpcKind.toString(), versionMap); + } + // Assuming unique method names. + Method desiredMethod; + Method[] allMethods = protocol.getMethods(); + desiredMethod = null; + for (Method m : allMethods) { + if (m.getName().equals(methodName)) { + desiredMethod = m; + break; + } + } + if (desiredMethod == null) { + return false; + } + int methodHash = ProtocolSignature.getFingerprint(desiredMethod); + return methodExists(methodHash, version, versionMap); + } + + private static Map + convertProtocolSignatureProtos(List protoList) { + Map map = new TreeMap(); + for (ProtocolSignatureProto p : protoList) { + int [] methods = new int[p.getMethodsList().size()]; + int index=0; + for (int m : p.getMethodsList()) { + methods[index++] = m; + } + map.put(p.getVersion(), new ProtocolSignature(p.getVersion(), methods)); + } + return map; + } + + private static boolean methodExists(int methodHash, long version, + Map versionMap) { + ProtocolSignature sig = versionMap.get(version); + if (sig != null) { + for (int m : sig.getMethods()) { + if (m == methodHash) { + return true; + } + } + } + return false; + } + + // The proxy returned re-uses the underlying connection. This is a special + // mechanism for ProtocolMetaInfoPB. + // Don't do this for any other protocol, it might cause a security hole. + private static ProtocolMetaInfoPB getProtocolMetaInfoProxy(Object proxy, + Configuration conf) throws IOException { + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(proxy); + return RPC + .getProtocolEngine(ProtocolMetaInfoPB.class, conf) + .getProtocolMetaInfoProxy(inv.getConnectionId(), conf, + NetUtils.getDefaultSocketFactory(conf)).getProxy(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index a9076e7d1e..0fc7d60bd3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -26,6 +26,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -54,4 +55,16 @@ public interface RpcEngine { SecretManager secretManager ) throws IOException; + /** + * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection + * id. + * @param connId, ConnectionId to be used for the proxy. + * @param conf, Configuration. + * @param factory, Socket factory. + * @return Proxy object. + * @throws IOException + */ + ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java new file mode 100644 index 0000000000..6bcd757357 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.Closeable; +import java.lang.reflect.InvocationHandler; + +import org.apache.hadoop.ipc.Client.ConnectionId; + +/** + * This interface must be implemented by all InvocationHandler + * implementations. + */ +public interface RpcInvocationHandler extends InvocationHandler, Closeable { + + /** + * Returns the connection id associated with the InvocationHandler instance. + * @return ConnectionId + */ + ConnectionId getConnectionId(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 19a496809b..fc0da0cf90 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -21,18 +21,17 @@ package org.apache.hadoop.ipc; import java.lang.reflect.Proxy; import java.lang.reflect.Method; import java.lang.reflect.Array; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.io.*; -import java.io.Closeable; import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.VersionedProtocol; @@ -202,7 +201,7 @@ public class WritableRpcEngine implements RpcEngine { private static ClientCache CLIENTS=new ClientCache(); - private static class Invoker implements InvocationHandler, Closeable { + private static class Invoker implements RpcInvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; @@ -239,6 +238,11 @@ public class WritableRpcEngine implements RpcEngine { CLIENTS.stopClient(client); } } + + @Override + public ConnectionId getConnectionId() { + return remoteId; + } } // for unit testing only @@ -524,4 +528,11 @@ public class WritableRpcEngine implements RpcEngine { } } } + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto b/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto new file mode 100644 index 0000000000..53046aaffd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.ipc.protobuf"; +option java_outer_classname = "ProtocolInfoProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +/** + * Request to get protocol versions for all supported rpc kinds. + */ +message GetProtocolVersionsRequestProto { + required string protocol = 1; // Protocol name +} + +/** + * Protocol version with corresponding rpc kind. + */ +message ProtocolVersionProto { + required string rpcKind = 1; //RPC kind + repeated uint64 versions = 2; //Protocol version corresponding to the rpc kind. +} + +/** + * Get protocol version response. + */ +message GetProtocolVersionsResponseProto { + repeated ProtocolVersionProto protocolVersions = 1; +} + +/** + * Get protocol signature request. + */ +message GetProtocolSignatureRequestProto { + required string protocol = 1; // Protocol name + required string rpcKind = 2; // RPC kind +} + +/** + * Get protocol signature response. + */ +message GetProtocolSignatureResponseProto { + repeated ProtocolSignatureProto protocolSignature = 1; +} + +message ProtocolSignatureProto { + required uint64 version = 1; + repeated uint32 methods = 2; +} + +/** + * Protocol to get information about protocols. + */ +service ProtocolInfoService { + /** + * Return protocol version corresponding to protocol interface for each + * supported rpc kind. + */ + rpc getProtocolVersions(GetProtocolVersionsRequestProto) + returns (GetProtocolVersionsResponseProto); + + /** + * Return protocol version corresponding to protocol interface. + */ + rpc getProtocolSignature(GetProtocolSignatureRequestProto) + returns (GetProtocolSignatureResponseProto); +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestDeprecatedKeys.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestDeprecatedKeys.java index 7008544f7b..8631771b9d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestDeprecatedKeys.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestDeprecatedKeys.java @@ -18,6 +18,8 @@ package org.apache.hadoop.conf; +import java.io.ByteArrayOutputStream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -32,4 +34,22 @@ public class TestDeprecatedKeys extends TestCase { String scriptFile = conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY); assertTrue(scriptFile.equals("xyz")) ; } + + //Tests reading / writing a conf file with deprecation after setting + public void testReadWriteWithDeprecatedKeys() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean("old.config.yet.to.be.deprecated", true); + Configuration.addDeprecation("old.config.yet.to.be.deprecated", + new String[]{"new.conf.to.replace.deprecated.conf"}); + ByteArrayOutputStream out=new ByteArrayOutputStream(); + String fileContents; + try { + conf.writeXml(out); + fileContents = out.toString(); + } finally { + out.close(); + } + assertTrue(fileContents.contains("old.config.yet.to.be.deprecated")); + assertTrue(fileContents.contains("new.conf.to.replace.deprecated.conf")); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java index 373bdf12d5..80347a72b4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java @@ -22,12 +22,22 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import static org.apache.hadoop.fs.FileSystemTestHelper.*; import org.apache.hadoop.conf.Configuration; -import junit.framework.TestCase; +import org.junit.*; +import static org.junit.Assert.*; -public class TestChecksumFileSystem extends TestCase { +public class TestChecksumFileSystem { static final String TEST_ROOT_DIR = System.getProperty("test.build.data","build/test/data/work-dir/localfs"); + static LocalFileSystem localFs; + + @Before + public void resetLocalFs() throws Exception { + localFs = FileSystem.getLocal(new Configuration()); + localFs.setVerifyChecksum(true); + } + + @Test public void testgetChecksumLength() throws Exception { assertEquals(8, ChecksumFileSystem.getChecksumLength(0L, 512)); assertEquals(12, ChecksumFileSystem.getChecksumLength(1L, 512)); @@ -40,9 +50,8 @@ public class TestChecksumFileSystem extends TestCase { ChecksumFileSystem.getChecksumLength(10000000000000L, 10)); } + @Test public void testVerifyChecksum() throws Exception { - Configuration conf = new Configuration(); - LocalFileSystem localFs = FileSystem.getLocal(conf); Path testPath = new Path(TEST_ROOT_DIR, "testPath"); Path testPath11 = new Path(TEST_ROOT_DIR, "testPath11"); FSDataOutputStream fout = localFs.create(testPath); @@ -68,7 +77,7 @@ public class TestChecksumFileSystem extends TestCase { //copying the wrong checksum file FileUtil.copy(localFs, localFs.getChecksumFile(testPath11), localFs, - localFs.getChecksumFile(testPath),false,true,conf); + localFs.getChecksumFile(testPath),false,true,localFs.getConf()); assertTrue("checksum exists", localFs.exists(localFs.getChecksumFile(testPath))); boolean errorRead = false; @@ -80,20 +89,13 @@ public class TestChecksumFileSystem extends TestCase { assertTrue("error reading", errorRead); //now setting verify false, the read should succeed - try { - localFs.setVerifyChecksum(false); - String str = readFile(localFs, testPath, 1024).toString(); - assertTrue("read", "testing".equals(str)); - } finally { - // reset for other tests - localFs.setVerifyChecksum(true); - } - + localFs.setVerifyChecksum(false); + String str = readFile(localFs, testPath, 1024).toString(); + assertTrue("read", "testing".equals(str)); } + @Test public void testMultiChunkFile() throws Exception { - Configuration conf = new Configuration(); - LocalFileSystem localFs = FileSystem.getLocal(conf); Path testPath = new Path(TEST_ROOT_DIR, "testMultiChunk"); FSDataOutputStream fout = localFs.create(testPath); for (int i = 0; i < 1000; i++) { @@ -116,9 +118,8 @@ public class TestChecksumFileSystem extends TestCase { * Test to ensure that if the checksum file is truncated, a * ChecksumException is thrown */ + @Test public void testTruncatedChecksum() throws Exception { - Configuration conf = new Configuration(); - LocalFileSystem localFs = FileSystem.getLocal(conf); Path testPath = new Path(TEST_ROOT_DIR, "testtruncatedcrc"); FSDataOutputStream fout = localFs.create(testPath); fout.write("testing truncation".getBytes()); @@ -146,14 +147,60 @@ public class TestChecksumFileSystem extends TestCase { } // telling it not to verify checksums, should avoid issue. + localFs.setVerifyChecksum(false); + String str = readFile(localFs, testPath, 1024).toString(); + assertTrue("read", "testing truncation".equals(str)); + } + + @Test + public void testStreamType() throws Exception { + Path testPath = new Path(TEST_ROOT_DIR, "testStreamType"); + localFs.create(testPath).close(); + FSDataInputStream in = null; + + localFs.setVerifyChecksum(true); + in = localFs.open(testPath); + assertTrue("stream is input checker", + in.getWrappedStream() instanceof FSInputChecker); + + localFs.setVerifyChecksum(false); + in = localFs.open(testPath); + assertFalse("stream is not input checker", + in.getWrappedStream() instanceof FSInputChecker); + } + + @Test + public void testCorruptedChecksum() throws Exception { + Path testPath = new Path(TEST_ROOT_DIR, "testCorruptChecksum"); + Path checksumPath = localFs.getChecksumFile(testPath); + + // write a file to generate checksum + FSDataOutputStream out = localFs.create(testPath, true); + out.write("testing 1 2 3".getBytes()); + out.close(); + assertTrue(localFs.exists(checksumPath)); + FileStatus stat = localFs.getFileStatus(checksumPath); + + // alter file directly so checksum is invalid + out = localFs.getRawFileSystem().create(testPath, true); + out.write("testing stale checksum".getBytes()); + out.close(); + assertTrue(localFs.exists(checksumPath)); + // checksum didn't change on disk + assertEquals(stat, localFs.getFileStatus(checksumPath)); + + Exception e = null; try { - localFs.setVerifyChecksum(false); - String str = readFile(localFs, testPath, 1024).toString(); - assertTrue("read", "testing truncation".equals(str)); - } finally { - // reset for other tests localFs.setVerifyChecksum(true); + readFile(localFs, testPath, 1024); + } catch (ChecksumException ce) { + e = ce; + } finally { + assertNotNull("got checksum error", e); } + localFs.setVerifyChecksum(false); + String str = readFile(localFs, testPath, 1024); + assertEquals("testing stale checksum", str); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java index 18bc8df651..58998be8e6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java @@ -517,6 +517,23 @@ public class TestSequenceFile extends TestCase { assertTrue("InputStream for " + path + " should have been closed.", openedFile[0].isClosed()); } + /** + * Test that makes sure createWriter succeeds on a file that was + * already created + * @throws IOException + */ + public void testCreateWriterOnExistingFile() throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path name = new Path(new Path(System.getProperty("test.build.data","."), + "createWriterOnExistingFile") , "file"); + + fs.create(name); + SequenceFile.createWriter(fs, conf, name, RandomDatum.class, + RandomDatum.class, 512, (short) 1, 4096, false, + CompressionType.NONE, null, new Metadata()); + } + public void testRecursiveSeqFileCreate() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 67fc608cb2..49e1ed6453 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.AuthorizationException; @@ -259,7 +260,13 @@ public class TestRPC { SecretManager secretManager) throws IOException { return null; } - + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index d38d823200..aca33ef25b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -32,6 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.net.NetUtils; import org.junit.After; import org.junit.Test; @@ -302,4 +305,72 @@ System.out.println("echo int is NOT supported"); ex.getMessage().contains("VersionMismatch")); } } + + @Test + public void testIsMethodSupported() throws IOException { + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, + false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, + TestProtocol2.versionID, addr, conf); + boolean supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_WRITABLE, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertTrue(supported); + supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertFalse(supported); + } + + /** + * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up + * the server registry to extract protocol signatures and versions. + */ + @Test + public void testProtocolMetaInfoSSTranslatorPB() throws Exception { + TestImpl1 impl = new TestImpl1(); + server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, + conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(server); + + GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_PROTOCOL_BUFFER)); + //No signatures should be found + Assert.assertEquals(0, resp.getProtocolSignatureCount()); + resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_WRITABLE)); + Assert.assertEquals(1, resp.getProtocolSignatureCount()); + ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); + Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); + boolean found = false; + int expected = ProtocolSignature.getFingerprint(TestProtocol1.class + .getMethod("echo", String.class)); + for (int m : sig.getMethodsList()) { + if (expected == m) { + found = true; + break; + } + } + Assert.assertTrue(found); + } + + private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( + Class protocol, RpcKind rpcKind) { + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + return builder.build(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/dev-support/test-patch.properties b/hadoop-hdfs-project/dev-support/test-patch.properties index df85aa6e28..9365482b27 100644 --- a/hadoop-hdfs-project/dev-support/test-patch.properties +++ b/hadoop-hdfs-project/dev-support/test-patch.properties @@ -18,4 +18,4 @@ OK_RELEASEAUDIT_WARNINGS=0 OK_FINDBUGS_WARNINGS=0 -OK_JAVADOC_WARNINGS=2 +OK_JAVADOC_WARNINGS=0 diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml index 28abf11624..c1325e495d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml @@ -53,6 +53,11 @@ mockito-all test + + org.apache.hadoop + hadoop-annotations + provided + com.sun.jersey jersey-server diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/sbin/httpfs.sh b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/sbin/httpfs.sh index 6566ab25c2..e45bd42d00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/sbin/httpfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/sbin/httpfs.sh @@ -55,8 +55,8 @@ if [ "${1}" = "stop" ]; then fi if [ "${HTTPFS_SILENT}" != "true" ]; then - ${BASEDIR}/share/hadoop/httpfs/tomcat/bin/catalina.sh "$@" + ${CATALINA_BASE:-"${BASEDIR}/share/hadoop/httpfs/tomcat"}/bin/catalina.sh "$@" else - ${BASEDIR}/share/hadoop/httpfs/tomcat/bin/catalina.sh "$@" > /dev/null + ${CATALINA_BASE:-"${BASEDIR}/share/hadoop/httpfs/tomcat"}/bin/catalina.sh "$@" > /dev/null fi diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7d777d0c99..eb85455e58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -365,6 +365,8 @@ Release 0.23.1 - UNRELEASED HDFS-2836. HttpFSServer still has 2 javadoc warnings in trunk (revans2 via tucu) + HDFS-2837. mvn javadoc:javadoc not seeing LimitedPrivate class (revans2 via tucu) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh index 09eec6e5de..2aabf5300b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh @@ -22,8 +22,6 @@ bin=`which "$0"` bin=`dirname "${bin}"` bin=`cd "$bin"; pwd` -export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}" - DEFAULT_LIBEXEC_DIR="$bin"/../libexec HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1137dd8607..20cbf3b625 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -79,6 +79,9 @@ Trunk (unreleased changes) MAPREDUCE-3664. Federation Documentation has incorrect configuration example. (Brandon Li via jitendra) + MAPREDUCE-3740. Fixed broken mapreduce compilation after the patch for + HADOOP-7965. (Devaraj K via vinodkv) + Release 0.23.1 - Unreleased INCOMPATIBLE CHANGES @@ -197,6 +200,15 @@ Release 0.23.1 - Unreleased MAPREDUCE-2765. DistCp Rewrite. (Mithun Radhakrishnan via mahadev) + MAPREDUCE-3737. The Web Application Proxy's is not documented very well. + (Robert Evans via mahadev) + + MAPREDUCE-3699. Increased RPC handlers for all YARN servers to reasonable + values for working at scale. (Hitesh Shah via vinodkv) + + MAPREDUCE-3693. Added mapreduce.admin.user.env to mapred-default.xml. + (Roman Shapshonik via acmurthy) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar @@ -220,6 +232,12 @@ Release 0.23.1 - Unreleased MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush for every event slowing down AM. (Siddarth Seth via vinodkv) + MAPREDUCE-3718. Change default AM heartbeat interval to 1 second. (Hitesh + Shah via sseth) + + MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes + on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob @@ -576,6 +594,15 @@ Release 0.23.1 - Unreleased MAPREDUCE-3733. Add Apache License Header to hadoop-distcp/pom.xml. (mahadev) + MAPREDUCE-3735. Add distcp jar to the distribution (tar). + (mahadev) + + MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific + information not available at RM. (vinodkv via acmurthy) + + MAPREDUCE-3742. "yarn logs" command fails with ClassNotFoundException. + (Jason Lowe via mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred index 2ffe904917..ff1ebbc67d 100755 --- a/hadoop-mapreduce-project/bin/mapred +++ b/hadoop-mapreduce-project/bin/mapred @@ -91,15 +91,15 @@ if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then fi # for releases, add core mapred jar & webapps to CLASSPATH -if [ -d "$HADOOP_PREFIX/share/hadoop/mapreduce/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/mapreduce +if [ -d "$HADOOP_PREFIX/${MAPRED_DIR}/webapps" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/${MAPRED_DIR} fi -for f in $HADOOP_MAPRED_HOME/share/hadoop-mapreduce/*.jar; do +for f in $HADOOP_MAPRED_HOME/${MAPRED_DIR}/*.jar; do CLASSPATH=${CLASSPATH}:$f; done # add libs to CLASSPATH -for f in $HADOOP_MAPRED_HOME/lib/*.jar; do +for f in $HADOOP_MAPRED_HOME/${MAPRED_LIB_JARS_DIR}/*.jar; do CLASSPATH=${CLASSPATH}:$f; done diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 2af7ef2ace..ba6b07e8ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -455,10 +455,14 @@ public class Job extends JobContextImpl implements JobContext { public String toString() { ensureState(JobState.RUNNING); String reasonforFailure = " "; + int numMaps = 0; + int numReduces = 0; try { updateStatus(); if (status.getState().equals(JobStatus.State.FAILED)) reasonforFailure = getTaskFailureEventString(); + numMaps = getTaskReports(TaskType.MAP).length; + numReduces = getTaskReports(TaskType.REDUCE).length; } catch (IOException e) { } catch (InterruptedException ie) { } @@ -468,6 +472,8 @@ public class Job extends JobContextImpl implements JobContext { sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); sb.append("\n"); sb.append("Uber job : ").append(status.isUber()).append("\n"); + sb.append("Number of maps: ").append(numMaps); + sb.append("Number of reduces: ").append(numReduces); sb.append("map() completion: "); sb.append(status.getMapProgress()).append("\n"); sb.append("reduce() completion: "); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 97504ea36c..7b684c5b61 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -412,12 +412,12 @@ public interface MRJobConfig { /** The number of threads used to handle task RPC calls.*/ public static final String MR_AM_TASK_LISTENER_THREAD_COUNT = MR_AM_PREFIX + "job.task.listener.thread-count"; - public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 10; + public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 30; /** How often the AM should send heartbeats to the RM.*/ public static final String MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = MR_AM_PREFIX + "scheduler.heartbeat.interval-ms"; - public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000; + public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000; /** * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index c42456aafc..b73a9ea2bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.tools; import java.io.IOException; +import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -28,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.ipc.RemoteException; @@ -579,25 +581,28 @@ public class CLI extends Configured implements Tool { } } } - + public void displayJobList(JobStatus[] jobs) throws IOException, InterruptedException { - System.out.println("Total jobs:" + jobs.length); - System.out.println("JobId\tState\tStartTime\t" + - "UserName\tQueue\tPriority\tMaps\tReduces\tUsedContainers\t" + - "RsvdContainers\tUsedMem\tRsvdMem\tNeededMem\tAM info"); - for (JobStatus job : jobs) { - TaskReport[] mapReports = - cluster.getJob(job.getJobID()).getTaskReports(TaskType.MAP); - TaskReport[] reduceReports = - cluster.getJob(job.getJobID()).getTaskReports(TaskType.REDUCE); + displayJobList(jobs, new PrintWriter(System.out)); + } - System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%dM\t%dM\t%dM\t%s\n", + @Private + public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; + @Private + public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n"; + + @Private + public void displayJobList(JobStatus[] jobs, PrintWriter writer) { + writer.println("Total jobs:" + jobs.length); + writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName", + "Queue", "Priority", "UsedContainers", + "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); + for (JobStatus job : jobs) { + writer.printf(dataPattern, job.getJobID().toString(), job.getState(), job.getStartTime(), job.getUsername(), job.getQueue(), job.getPriority().name(), - mapReports.length, - reduceReports.length, job.getNumUsedSlots(), job.getNumReservedSlots(), job.getUsedMem(), @@ -605,6 +610,7 @@ public class CLI extends Configured implements Tool { job.getNeededMem(), job.getSchedulingInfo()); } + writer.flush(); } public static void main(String[] argv) throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 5d02250c49..920f8df455 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -440,6 +440,16 @@ + + mapreduce.admin.user.env + LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native + Expert: Additional execution environment entries for + map and reduce task processes. This is not an additive property. + You must preserve the original value if you want your map and + reduce tasks to have access to native libraries (compression, etc). + + + mapreduce.task.tmp.dir ./tmp @@ -1224,4 +1234,18 @@ mapreduce.job.end-notification.max.retry.interval + + yarn.app.mapreduce.am.job.task.listener.thread-count + 30 + The number of threads used to handle RPC calls in the + MR AppMaster from remote tasks + + + + yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms + 1000 + The interval in ms at which the MR AppMaster should send + heartbeats to the ResourceManager + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java index 3f54e09a33..a49f1fa798 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java @@ -22,19 +22,24 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import org.apache.hadoop.mapred.JobConf; +import java.io.PrintWriter; + import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapreduce.TaskType; +import org.junit.Assert; import org.junit.Test; +@SuppressWarnings("deprecation") public class JobClientUnitTest { public class TestJobClient extends JobClient { @@ -48,7 +53,6 @@ public class JobClientUnitTest { } } - @SuppressWarnings("deprecation") @Test public void testMapTaskReportsWithNullJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); @@ -64,7 +68,6 @@ public class JobClientUnitTest { verify(mockCluster).getJob(id); } - @SuppressWarnings("deprecation") @Test public void testReduceTaskReportsWithNullJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); @@ -80,7 +83,6 @@ public class JobClientUnitTest { verify(mockCluster).getJob(id); } - @SuppressWarnings("deprecation") @Test public void testSetupTaskReportsWithNullJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); @@ -96,7 +98,6 @@ public class JobClientUnitTest { verify(mockCluster).getJob(id); } - @SuppressWarnings("deprecation") @Test public void testCleanupTaskReportsWithNullJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); @@ -115,12 +116,15 @@ public class JobClientUnitTest { @Test public void testShowJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); - JobID jobID = new JobID("test", 0); + + long startTime = System.currentTimeMillis(); + + JobID jobID = new JobID(String.valueOf(startTime), 12345); JobStatus mockJobStatus = mock(JobStatus.class); when(mockJobStatus.getJobID()).thenReturn(jobID); when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING); - when(mockJobStatus.getStartTime()).thenReturn(0L); + when(mockJobStatus.getStartTime()).thenReturn(startTime); when(mockJobStatus.getUsername()).thenReturn("mockuser"); when(mockJobStatus.getQueue()).thenReturn("mockqueue"); when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL); @@ -132,18 +136,21 @@ public class JobClientUnitTest { when(mockJobStatus.getSchedulingInfo()).thenReturn("NA"); Job mockJob = mock(Job.class); - when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(new TaskReport[0]); + when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn( + new TaskReport[5]); Cluster mockCluster = mock(Cluster.class); when(mockCluster.getJob(jobID)).thenReturn(mockJob); client.setCluster(mockCluster); - - client.displayJobList(new JobStatus[] {mockJobStatus}); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out)); + String commandLineOutput = out.toString(); + System.out.println(commandLineOutput); + Assert.assertTrue(commandLineOutput.contains("Total jobs:1")); + verify(mockJobStatus, atLeastOnce()).getJobID(); - verify(mockJob, atLeastOnce()).getTaskReports(isA(TaskType.class)); - verify(mockCluster, atLeastOnce()).getJob(jobID); verify(mockJobStatus).getState(); verify(mockJobStatus).getStartTime(); verify(mockJobStatus).getUsername(); @@ -155,5 +162,9 @@ public class JobClientUnitTest { verify(mockJobStatus).getReservedMem(); verify(mockJobStatus).getNeededMem(); verify(mockJobStatus).getSchedulingInfo(); + + // This call should not go to each AM. + verify(mockCluster, never()).getJob(jobID); + verify(mockJob, never()).getTaskReports(isA(TaskType.class)); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn index 3cf3c798b0..7ceac4feae 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn @@ -140,8 +140,8 @@ if [ -d "$YARN_HOME/build/tools" ]; then CLASSPATH=${CLASSPATH}:$YARN_HOME/build/tools fi -CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/* -CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib/* +CLASSPATH=${CLASSPATH}:$YARN_HOME/${YARN_DIR}/* +CLASSPATH=${CLASSPATH}:$YARN_HOME/${YARN_LIB_JARS_DIR}/* # so that filenames w/ spaces are handled correctly in loops below IFS= @@ -194,7 +194,7 @@ elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hadoop.util.RunJar YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" elif [ "$COMMAND" = "logs" ] ; then - CLASS=org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper + CLASS=org.apache.hadoop.yarn.logaggregation.LogDumper YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" elif [ "$COMMAND" = "daemonlog" ] ; then CLASS=org.apache.hadoop.log.LogLevel diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-config.sh b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-config.sh index 2757044273..934a461b64 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-config.sh +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-config.sh @@ -19,8 +19,6 @@ bin=`which "$0"` bin=`dirname "${bin}"` bin=`cd "$bin"; pwd` -export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}" - DEFAULT_LIBEXEC_DIR="$bin"/../libexec HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a030079779..f4cbf6e65b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -90,7 +90,7 @@ public class YarnConfiguration extends Configuration { /** The number of threads used to handle applications manager requests.*/ public static final String RM_CLIENT_THREAD_COUNT = RM_PREFIX + "client.thread-count"; - public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10; + public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 50; /** The Kerberos principal for the resource manager.*/ public static final String RM_PRINCIPAL = @@ -106,7 +106,7 @@ public class YarnConfiguration extends Configuration { /** Number of threads to handle scheduler interface.*/ public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT = RM_PREFIX + "scheduler.client.thread-count"; - public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 10; + public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; /** The address of the RM web application.*/ public static final String RM_WEBAPP_ADDRESS = @@ -184,7 +184,7 @@ public class YarnConfiguration extends Configuration { /** Number of threads to handle resource tracker calls.*/ public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = RM_PREFIX + "resource-tracker.client.thread-count"; - public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 10; + public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50; /** The class to use as the resource scheduler.*/ public static final String RM_SCHEDULER = @@ -257,7 +257,7 @@ public class YarnConfiguration extends Configuration { /** Number of threads container manager uses.*/ public static final String NM_CONTAINER_MGR_THREAD_COUNT = NM_PREFIX + "container-manager.thread-count"; - public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 5; + public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; /** Number of threads used in cleanup.*/ public static final String NM_DELETE_THREAD_COUNT = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java index 2c56d318af..bbece2f34f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java @@ -36,10 +36,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtocolMetaInfoPB; import org.apache.hadoop.ipc.ProtocolProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcEngine; import org.apache.hadoop.ipc.ClientCache; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -73,6 +75,17 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine { addr, ticket, conf, factory, rpcTimeout)), false); } + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + Class protocol = ProtocolMetaInfoPB.class; + return new ProtocolProxy(protocol, + (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new Invoker(protocol, connId, conf, + factory)), false); + } + private static class Invoker implements InvocationHandler, Closeable { private Map returnTypes = new ConcurrentHashMap(); private boolean isClosed = false; @@ -82,8 +95,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine { public Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf); + this(protocol, Client.ConnectionId.getConnectionId(addr, protocol, + ticket, rpcTimeout, conf), conf, factory); + } + + public Invoker(Class protocol, Client.ConnectionId connId, + Configuration conf, SocketFactory factory) { + this.remoteId = connId; this.client = CLIENTS.getClient(conf, factory, ProtoSpecificResponseWritable.class); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index b9e5ea47e4..cea45798a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -67,7 +67,7 @@ The number of threads used to handle applications manager requests. yarn.resourcemanager.client.thread-count - 10 + 50 @@ -90,7 +90,7 @@ Number of threads to handle scheduler interface. yarn.resourcemanager.scheduler.client.thread-count - 10 + 50 @@ -179,7 +179,7 @@ Number of threads to handle resource tracker calls. yarn.resourcemanager.resource-tracker.client.thread-count - 10 + 50 @@ -244,7 +244,7 @@ Number of threads container manager uses. yarn.nodemanager.container-manager.thread-count - 5 + 20 diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index 2a37856d5d..e9e1b2fb0b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -29,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -39,9 +38,9 @@ public class ClusterMetrics { private static AtomicBoolean isInitialized = new AtomicBoolean(false); - @Metric("# of NMs") MutableGaugeInt numNMs; - @Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs; - @Metric("# of lost NMs") MutableCounterInt numLostNMs; + @Metric("# of active NMs") MutableGaugeInt numNMs; + @Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs; + @Metric("# of lost NMs") MutableGaugeInt numLostNMs; @Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs; @Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs; @@ -73,8 +72,8 @@ public class ClusterMetrics { } } - //Total Nodemanagers - public int getNumNMs() { + //Active Nodemanagers + public int getNumActiveNMs() { return numNMs.value(); } @@ -87,6 +86,10 @@ public class ClusterMetrics { numDecommissionedNMs.incr(); } + public void decrDecommisionedNMs() { + numDecommissionedNMs.decr(); + } + //Lost NMs public int getNumLostNMs() { return numLostNMs.value(); @@ -96,6 +99,10 @@ public class ClusterMetrics { numLostNMs.incr(); } + public void decrNumLostNMs() { + numLostNMs.decr(); + } + //Unhealthy NMs public int getUnhealthyNMs() { return numUnhealthyNMs.value(); @@ -118,6 +125,10 @@ public class ClusterMetrics { numRebootedNMs.incr(); } + public void decrNumRebootedNMs() { + numRebootedNMs.decr(); + } + public void removeNode(RMNodeEventType nodeEventType) { numNMs.decr(); switch(nodeEventType){ diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 3d975818f2..117e77cb77 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -43,6 +43,8 @@ public interface RMContext { ApplicationsStore getApplicationsStore(); ConcurrentMap getRMApps(); + + ConcurrentMap getInactiveRMNodes(); ConcurrentMap getRMNodes(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index a177f1cc16..029a22c8fe 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -43,6 +43,9 @@ public class RMContextImpl implements RMContext { private final ConcurrentMap nodes = new ConcurrentHashMap(); + + private final ConcurrentMap inactiveNodes + = new ConcurrentHashMap(); private AMLivelinessMonitor amLivelinessMonitor; private ContainerAllocationExpirer containerAllocationExpirer; @@ -83,6 +86,11 @@ public class RMContextImpl implements RMContext { public ConcurrentMap getRMNodes() { return this.nodes; } + + @Override + public ConcurrentMap getInactiveRMNodes() { + return this.inactiveNodes; + } @Override public ContainerAllocationExpirer getContainerAllocationExpirer() { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index ccebe3a890..75c91aa83f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -220,10 +220,6 @@ public class ResourceTrackerService extends AbstractService implements if (rmNode == null) { /* node does not exist */ LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId()); - - // Updating the metrics directly as reboot event cannot be - // triggered on a null rmNode - ClusterMetrics.getMetrics().incrNumRebootedNMs(); return reboot; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7f2b48f85b..4e79540733 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -119,7 +119,7 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType.DECOMMISSION, new RemoveNodeTransition()) .addTransition(RMNodeState.RUNNING, RMNodeState.LOST, RMNodeEventType.EXPIRE, new RemoveNodeTransition()) - .addTransition(RMNodeState.RUNNING, RMNodeState.LOST, + .addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED, RMNodeEventType.REBOOTING, new RemoveNodeTransition()) .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) @@ -307,6 +307,21 @@ public class RMNodeImpl implements RMNode, EventHandler { public static class AddNodeTransition implements SingleArcTransition { + + private void updateMetrics(RMNodeState nodeState) { + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + switch (nodeState) { + case LOST: + metrics.decrNumLostNMs(); + break; + case REBOOTED: + metrics.decrNumRebootedNMs(); + break; + case DECOMMISSIONED: + metrics.decrDecommisionedNMs(); + break; + } + } @SuppressWarnings("unchecked") @Override @@ -315,6 +330,13 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); + + String host = rmNode.nodeId.getHost(); + if (rmNode.context.getInactiveRMNodes().containsKey(host)) { + RMNode node = rmNode.context.getInactiveRMNodes().get(host); + rmNode.context.getInactiveRMNodes().remove(host); + updateMetrics(node.getState()); + } ClusterMetrics.getMetrics().addNode(); } @@ -353,7 +375,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // Remove the node from the system. rmNode.context.getRMNodes().remove(rmNode.nodeId); LOG.info("Removed Node " + rmNode.nodeId); - + rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode); //Update the metrics ClusterMetrics.getMetrics().removeNode(event.getType()); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java index 4b3d33c177..92a84a244c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java @@ -68,7 +68,7 @@ public class MetricsOverviewTable extends HtmlBlock { th().$class("ui-state-default")._("Memory Used")._(). th().$class("ui-state-default")._("Memory Total")._(). th().$class("ui-state-default")._("Memory Reserved")._(). - th().$class("ui-state-default")._("Total Nodes")._(). + th().$class("ui-state-default")._("Active Nodes")._(). th().$class("ui-state-default")._("Decommissioned Nodes")._(). th().$class("ui-state-default")._("Lost Nodes")._(). th().$class("ui-state-default")._("Unhealthy Nodes")._(). @@ -82,7 +82,7 @@ public class MetricsOverviewTable extends HtmlBlock { td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)). td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)). td(StringUtils.byteDesc(clusterMetrics.getReservedMB() * BYTES_IN_MB)). - td().a(url("nodes"),String.valueOf(clusterMetrics.getTotalNodes()))._(). + td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._(). td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._(). td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._(). td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._(). diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java index 79c371211c..cb6b6c5ad0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit; +import java.util.Collection; + import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import com.google.inject.Inject; @@ -79,7 +82,19 @@ class NodesPage extends RmView { if(type != null && !type.isEmpty()) { stateFilter = RMNodeState.valueOf(type.toUpperCase()); } - for (RMNode ni : this.rmContext.getRMNodes().values()) { + Collection rmNodes = this.rmContext.getRMNodes().values(); + boolean isInactive = false; + if (stateFilter != null) { + switch (stateFilter) { + case DECOMMISSIONED: + case LOST: + case REBOOTED: + rmNodes = this.rmContext.getInactiveRMNodes().values(); + isInactive = true; + break; + } + } + for (RMNode ni : rmNodes) { if(stateFilter != null) { RMNodeState state = ni.getState(); if(!stateFilter.equals(state)) { @@ -89,12 +104,17 @@ class NodesPage extends RmView { NodeInfo info = new NodeInfo(ni, sched); int usedMemory = (int)info.getUsedMemory(); int availableMemory = (int)info.getAvailableMemory(); - tbody.tr(). + TR>> row = tbody.tr(). td(info.getRack()). td(info.getState()). - td(info.getNodeId()). - td().a("http://" + info.getNodeHTTPAddress(), info.getNodeHTTPAddress())._(). - td(info.getHealthStatus()). + td(info.getNodeId()); + if (isInactive) { + row.td()._("N/A")._(); + } else { + String httpAddress = info.getNodeHTTPAddress(); + row.td().a("http://" + httpAddress, httpAddress)._(); + } + row.td(info.getHealthStatus()). td(Times.format(info.getLastHealthUpdate())). td(info.getHealthReport()). td(String.valueOf(info.getNumContainers())). diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 06551b21a8..449ba758e8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.ConcurrentMap; import javax.servlet.http.HttpServletRequest; @@ -68,6 +69,7 @@ import com.google.inject.Singleton; @Singleton @Path("/ws/v1/cluster") public class RMWebServices { + private static final String EMPTY = ""; private static final Log LOG = LogFactory.getLog(RMWebServices.class); private final ResourceManager rm; private static RecordFactory recordFactory = RecordFactoryProvider @@ -144,12 +146,23 @@ public class RMWebServices { if (sched == null) { throw new NotFoundException("Null ResourceScheduler instance"); } - + Collection rmNodes = this.rm.getRMContext().getRMNodes().values(); + boolean isInactive = false; + if (filterState != null && !filterState.isEmpty()) { + RMNodeState nodeState = RMNodeState.valueOf(filterState.toUpperCase()); + switch (nodeState) { + case DECOMMISSIONED: + case LOST: + case REBOOTED: + rmNodes = this.rm.getRMContext().getInactiveRMNodes().values(); + isInactive = true; + break; + } + } NodesInfo allNodes = new NodesInfo(); - for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) { + for (RMNode ni : rmNodes) { NodeInfo nodeInfo = new NodeInfo(ni, sched); if (filterState != null) { - RMNodeState.valueOf(filterState); if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) { continue; } @@ -165,6 +178,9 @@ public class RMWebServices { continue; } } + if (isInactive) { + nodeInfo.setNodeHTTPAddress(EMPTY); + } allNodes.add(nodeInfo); } return allNodes; @@ -183,10 +199,19 @@ public class RMWebServices { } NodeId nid = ConverterUtils.toNodeId(nodeId); RMNode ni = this.rm.getRMContext().getRMNodes().get(nid); + boolean isInactive = false; if (ni == null) { - throw new NotFoundException("nodeId, " + nodeId + ", is not found"); + ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost()); + if (ni == null) { + throw new NotFoundException("nodeId, " + nodeId + ", is not found"); + } + isInactive = true; } - return new NodeInfo(ni, sched); + NodeInfo nodeInfo = new NodeInfo(ni, sched); + if (isInactive) { + nodeInfo.setNodeHTTPAddress(EMPTY); + } + return nodeInfo; } @GET diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index fcf878346c..7d63b057a1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -44,6 +44,7 @@ public class ClusterMetricsInfo { protected int unhealthyNodes; protected int decommissionedNodes; protected int rebootedNodes; + protected int activeNodes; public ClusterMetricsInfo() { } // JAXB needs this @@ -59,12 +60,13 @@ public class ClusterMetricsInfo { this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB; this.containersAllocated = metrics.getAllocatedContainers(); this.totalMB = availableMB + reservedMB + allocatedMB; - this.totalNodes = clusterMetrics.getNumNMs(); + this.activeNodes = clusterMetrics.getNumActiveNMs(); this.lostNodes = clusterMetrics.getNumLostNMs(); this.unhealthyNodes = clusterMetrics.getUnhealthyNMs(); this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs(); this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); - + this.totalNodes = activeNodes + lostNodes + decommissionedNodes + + rebootedNodes; } public int getAppsSubmitted() { @@ -94,6 +96,10 @@ public class ClusterMetricsInfo { public int getTotalNodes() { return this.totalNodes; } + + public int getActiveNodes() { + return this.activeNodes; + } public int getLostNodes() { return this.lostNodes; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java index bafecbb338..facd73aef6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java @@ -94,6 +94,10 @@ public class NodeInfo { public String getNodeHTTPAddress() { return this.nodeHTTPAddress; } + + public void setNodeHTTPAddress(String nodeHTTPAddress) { + this.nodeHTTPAddress = nodeHTTPAddress; + } public String getHealthStatus() { return this.healthStatus; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index bd44f10b9e..3434b3c434 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -81,13 +81,20 @@ public class MockNM { } public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { - return nodeHeartbeat(new HashMap>(), b); + return nodeHeartbeat(new HashMap>(), + b, ++responseId); } public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { + return nodeHeartbeat(conts, isHealthy, ++responseId); + } + + public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); + status.setResponseId(resId); status.setNodeId(nodeId); for (Map.Entry> entry : conts.entrySet()) { status.setContainersStatuses(entry.getValue()); @@ -97,7 +104,6 @@ public class MockNM { healthStatus.setIsNodeHealthy(isHealthy); healthStatus.setLastHealthReportTime(1); status.setNodeHealthStatus(healthStatus); - status.setResponseId(++responseId); req.setNodeStatus(status); return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 90b43504c1..7ded620043 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -56,6 +56,17 @@ public class MockNodes { } return list; } + + public static List lostNodes(int racks, int nodesPerRack, + Resource perNode) { + List list = Lists.newArrayList(); + for (int i = 0; i < racks; ++i) { + for (int j = 0; j < nodesPerRack; ++j) { + list.add(lostNodeInfo(i, perNode, RMNodeState.LOST)); + } + } + return list; + } public static NodeId newNodeID(String host, int port) { NodeId nid = recordFactory.newRecordInstance(NodeId.class); @@ -82,92 +93,120 @@ public class MockNodes { return rs; } - public static RMNode newNodeInfo(int rack, final Resource perNode) { + private static class MockRMNodeImpl implements RMNode { + private NodeId nodeId; + private String hostName; + private String nodeAddr; + private String httpAddress; + private int cmdPort; + private Resource perNode; + private String rackName; + private NodeHealthStatus nodeHealthStatus; + private RMNodeState state; + + public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, + Resource perNode, String rackName, NodeHealthStatus nodeHealthStatus, + int cmdPort, String hostName, RMNodeState state) { + this.nodeId = nodeId; + this.nodeAddr = nodeAddr; + this.httpAddress = httpAddress; + this.perNode = perNode; + this.rackName = rackName; + this.nodeHealthStatus = nodeHealthStatus; + this.cmdPort = cmdPort; + this.hostName = hostName; + this.state = state; + } + + @Override + public NodeId getNodeID() { + return this.nodeId; + } + + @Override + public String getHostName() { + return this.hostName; + } + + @Override + public int getCommandPort() { + return this.cmdPort; + } + + @Override + public int getHttpPort() { + return 0; + } + + @Override + public String getNodeAddress() { + return this.nodeAddr; + } + + @Override + public String getHttpAddress() { + return this.httpAddress; + } + + @Override + public NodeHealthStatus getNodeHealthStatus() { + return this.nodeHealthStatus; + } + + @Override + public Resource getTotalCapability() { + return this.perNode; + } + + @Override + public String getRackName() { + return this.rackName; + } + + @Override + public Node getNode() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public RMNodeState getState() { + return this.state; + } + + @Override + public List getContainersToCleanUp() { + return null; + } + + @Override + public List getAppsToCleanup() { + return null; + } + + @Override + public HeartbeatResponse getLastHeartBeatResponse() { + return null; + } + }; + + private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) { final String rackName = "rack"+ rack; final int nid = NODE_ID++; final String hostName = "host"+ nid; final int port = 123; final NodeId nodeID = newNodeID(hostName, port); - final String httpAddress = "localhost:0"; + final String httpAddress = httpAddr; final NodeHealthStatus nodeHealthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class); - final Resource used = newUsedResource(perNode); - final Resource avail = newAvailResource(perNode, used); - return new RMNode() { - @Override - public NodeId getNodeID() { - return nodeID; - } + return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName, + nodeHealthStatus, nid, hostName, state); + } - @Override - public String getNodeAddress() { - return hostName; - } + public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) { + return buildRMNode(rack, perNode, state, "N/A"); + } - @Override - public String getHttpAddress() { - return httpAddress; - } - - @Override - public Resource getTotalCapability() { - return perNode; - } - - @Override - public String getRackName() { - return rackName; - } - - @Override - public Node getNode() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public NodeHealthStatus getNodeHealthStatus() { - return nodeHealthStatus; - } - - @Override - public int getCommandPort() { - return nid; - } - - @Override - public int getHttpPort() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public String getHostName() { - return hostName; - } - - @Override - public RMNodeState getState() { - // TODO Auto-generated method stub - return null; - } - - @Override - public List getAppsToCleanup() { - // TODO Auto-generated method stub - return null; - } - - @Override - public List getContainersToCleanUp() { - // TODO Auto-generated method stub - return null; - } - - @Override - public HeartbeatResponse getLastHeartBeatResponse() { - // TODO Auto-generated method stub - return null; - } - }; + public static RMNode newNodeInfo(int rack, final Resource perNode) { + return buildRMNode(rack, perNode, null, "localhost:0"); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 1b10854eeb..6d9e726e87 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -130,6 +130,12 @@ public class MockRM extends ResourceManager { nm.getNodeId()); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED)); } + + public void sendNodeLost(MockNM nm) throws Exception { + RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( + nm.getNodeId()); + node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); + } public void NMwaitForState(NodeId nodeid, RMNodeState finalState) throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 4b4fb92e05..ccd8d57a0d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -31,6 +31,7 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; @@ -100,8 +101,8 @@ public class TestRMNodeTransitions { rmDispatcher.register(SchedulerEventType.class, new TestSchedulerEventDispatcher()); - - node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null); + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 183396092b..8b3f4a08e9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -157,14 +157,14 @@ public class TestResourceTrackerService { rm.start(); MockNM nm1 = rm.registerNode("host1:1234", 5120); - MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService()); + MockNM nm2 = rm.registerNode("host2:1234", 2048); int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs(); HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat( - new HashMap>(), true); + new HashMap>(), true, -100); Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); checkRebootedNMCount(rm, ++initialMetricCount); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java index 4a264fd24b..a32c285eec 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock; import org.apache.hadoop.yarn.webapp.test.WebAppTests; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -36,39 +37,65 @@ import com.google.inject.Module; * data for all the columns in the table as specified in the header. */ public class TestNodesPage { + + final int numberOfRacks = 2; + final int numberOfNodesPerRack = 2; + // Number of Actual Table Headers for NodesPage.NodesBlock might change in + // future. In that case this value should be adjusted to the new value. + final int numberOfThInMetricsTable = 10; + final int numberOfActualTableHeaders = 10; - @Test - public void testNodesBlockRender() throws Exception { - final int numberOfRacks = 2; - final int numberOfNodesPerRack = 2; - // Number of Actual Table Headers for NodesPage.NodesBlock might change in - // future. In that case this value should be adjusted to the new value. - final int numberOfThInMetricsTable = 10; - final int numberOfActualTableHeaders = 10; - - Injector injector = WebAppTests.createMockInjector(RMContext.class, - TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB), - new Module() { + private Injector injector; + + @Before + public void setUp() throws Exception { + injector = WebAppTests.createMockInjector(RMContext.class, TestRMWebApp + .mockRMContext(3, numberOfRacks, numberOfNodesPerRack, + 8 * TestRMWebApp.GiB), new Module() { @Override public void configure(Binder binder) { try { - binder.bind(ResourceManager.class).toInstance(TestRMWebApp.mockRm(3, - numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB)); + binder.bind(ResourceManager.class).toInstance( + TestRMWebApp.mockRm(3, numberOfRacks, numberOfNodesPerRack, + 8 * TestRMWebApp.GiB)); } catch (IOException e) { throw new IllegalStateException(e); } } }); + } + + @Test + public void testNodesBlockRender() throws Exception { injector.getInstance(NodesBlock.class).render(); PrintWriter writer = injector.getInstance(PrintWriter.class); WebAppTests.flushOutput(injector); - Mockito.verify(writer, Mockito.times(numberOfActualTableHeaders + - numberOfThInMetricsTable)).print( - " lostNodes = MockNodes.lostNodes(racks, numNodes, + newResource(mbsPerNode)); + final ConcurrentMap lostNodesMap = Maps.newConcurrentMap(); + for (RMNode node : lostNodes) { + lostNodesMap.put(node.getHostName(), node); + } return new RMContextImpl(new MemStore(), null, null, null, null) { @Override public ConcurrentMap getRMApps() { return applicationsMaps; } @Override + public ConcurrentMap getInactiveRMNodes() { + return lostNodesMap; + } + @Override public ConcurrentMap getRMNodes() { return nodesMap; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index 51d2fa7d0d..746eec234e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -370,7 +370,8 @@ public class TestRMWebServices extends JerseyTest { WebServicesTestUtils.getXmlInt(element, "lostNodes"), WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"), WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"), - WebServicesTestUtils.getXmlInt(element, "rebootedNodes")); + WebServicesTestUtils.getXmlInt(element, "rebootedNodes"), + WebServicesTestUtils.getXmlInt(element, "activeNodes")); } } @@ -378,7 +379,7 @@ public class TestRMWebServices extends JerseyTest { Exception { assertEquals("incorrect number of elements", 1, json.length()); JSONObject clusterinfo = json.getJSONObject("clusterMetrics"); - assertEquals("incorrect number of elements", 11, clusterinfo.length()); + assertEquals("incorrect number of elements", 12, clusterinfo.length()); verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"), clusterinfo.getInt("allocatedMB"), @@ -386,13 +387,13 @@ public class TestRMWebServices extends JerseyTest { clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"), clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"), clusterinfo.getInt("decommissionedNodes"), - clusterinfo.getInt("rebootedNodes")); + clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes")); } public void verifyClusterMetrics(int sub, int reservedMB, int availableMB, int allocMB, int containersAlloc, int totalMB, int totalNodes, int lostNodes, int unhealthyNodes, int decommissionedNodes, - int rebootedNodes) throws JSONException, Exception { + int rebootedNodes, int activeNodes) throws JSONException, Exception { ResourceScheduler rs = rm.getResourceScheduler(); QueueMetrics metrics = rs.getRootQueueMetrics(); @@ -412,8 +413,11 @@ public class TestRMWebServices extends JerseyTest { * MB_IN_GB, allocMB); assertEquals("containersAllocated doesn't match", 0, containersAlloc); assertEquals("totalMB doesn't match", totalMBExpect, totalMB); - assertEquals("totalNodes doesn't match", clusterMetrics.getNumNMs(), - totalNodes); + assertEquals( + "totalNodes doesn't match", + clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs() + + clusterMetrics.getNumDecommisionedNMs() + + clusterMetrics.getNumRebootedNMs(), totalNodes); assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(), lostNodes); assertEquals("unhealthyNodes doesn't match", @@ -422,6 +426,8 @@ public class TestRMWebServices extends JerseyTest { clusterMetrics.getNumDecommisionedNMs(), decommissionedNodes); assertEquals("rebootedNodes doesn't match", clusterMetrics.getNumRebootedNMs(), rebootedNodes); + assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(), + activeNodes); } @Test diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 8a52ac153d..8886d6e451 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -202,6 +202,69 @@ public class TestRMWebServicesNodes extends JerseyTest { rm.stop(); } } + + @Test + public void testNodesQueryStateLost() throws JSONException, Exception { + WebResource r = resource(); + MockNM nm1 = rm.registerNode("h1:1234", 5120); + MockNM nm2 = rm.registerNode("h2:1234", 5120); + rm.sendNodeStarted(nm1); + rm.sendNodeStarted(nm2); + rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING); + rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING); + rm.sendNodeLost(nm1); + rm.sendNodeLost(nm2); + + ClientResponse response = r.path("ws").path("v1").path("cluster") + .path("nodes").queryParam("state", RMNodeState.LOST.toString()) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + JSONObject nodes = json.getJSONObject("nodes"); + assertEquals("incorrect number of elements", 1, nodes.length()); + JSONArray nodeArray = nodes.getJSONArray("node"); + assertEquals("incorrect number of elements", 2, nodeArray.length()); + for (int i = 0; i < nodeArray.length(); ++i) { + JSONObject info = nodeArray.getJSONObject(i); + String host = info.get("id").toString().split(":")[0]; + RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host); + WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", + info.getString("nodeHTTPAddress")); + WebServicesTestUtils.checkStringMatch("state", rmNode.getState() + .toString(), info.getString("state")); + } + } + + @Test + public void testSingleNodeQueryStateLost() throws JSONException, Exception { + WebResource r = resource(); + MockNM nm1 = rm.registerNode("h1:1234", 5120); + MockNM nm2 = rm.registerNode("h2:1234", 5120); + rm.sendNodeStarted(nm1); + rm.sendNodeStarted(nm2); + rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING); + rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING); + rm.sendNodeLost(nm1); + rm.sendNodeLost(nm2); + + ClientResponse response = r.path("ws").path("v1").path("cluster") + .path("nodes").path("h2:1234").accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + JSONObject info = json.getJSONObject("node"); + String id = info.get("id").toString(); + + assertEquals("Incorrect Node Information.", "h2:1234", id); + + RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2"); + WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", + info.getString("nodeHTTPAddress")); + WebServicesTestUtils.checkStringMatch("state", + rmNode.getState().toString(), info.getString("state")); + } @Test public void testNodesQueryHealthy() throws JSONException, Exception { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WebApplicationProxy.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WebApplicationProxy.apt.vm new file mode 100644 index 0000000000..4646235372 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WebApplicationProxy.apt.vm @@ -0,0 +1,49 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + YARN + --- + --- + ${maven.build.timestamp} + +Web Application Proxy + + The Web Application Proxy is part of YARN. By default it will run as part of + the Resource Manager(RM), but can be configured to run in stand alone mode. + The reason for the proxy is to reduce the possibility of web based attacks + through YARN. + + In YARN the Application Master(AM) has the responsibility to provide a web UI + and to send that link to the RM. This opens up a number of potential + issues. The RM runs as a trusted user, and people visiting that web + address will treat it, and links it provides to them as trusted, when in + reality the AM is running as a non-trusted user, and the links it gives to + the RM could point to anything malicious or otherwise. The Web Application + Proxy mitigates this risk by warning users that do not own the given + application that they are connecting to an untrusted site. + + In addition to this the proxy also tries to reduce the impact that a malicious + AM could have on a user. It primarily does this by stripping out cookies from + the user, and replacing them with a single cookie providing the user name of + the logged in user. This is because most web based authentication systems will + identify a user based off of a cookie. By providing this cookie to an + untrusted application it opens up the potential for an exploit. If the cookie + is designed properly that potential should be fairly minimal, but this is just + to reduce that potential attack vector. The current proxy implementation does + nothing to prevent the AM from providing links to malicious external sites, + nor does it do anything to prevent malicious javascript code from running as + well. In fact javascript can be used to get the cookies, so stripping the + cookies from the request has minimal benefit at this time. + + In the future we hope to address the attack vectors described above and make + attaching to an AM's web UI safer. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm index 5f3883fd1a..ced6c3471d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm @@ -47,4 +47,6 @@ MapReduce NextGen aka YARN aka MRv2 * {{{./CapacityScheduler.html}Capacity Scheduler}} + * {{{./WebApplicationProxy.html}Web Application Proxy}} + diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 8e3d49dc2f..2753c60273 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -223,6 +223,11 @@ hadoop-archives ${project.version} + + org.apache.hadoop + hadoop-distcp + ${project.version} + org.apache.hadoop hadoop-rumen diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 95c3325775..df7afbb05f 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -61,6 +61,7 @@ + diff --git a/hadoop-tools/hadoop-distcp/pom.xml b/hadoop-tools/hadoop-distcp/pom.xml index 0d8bf40a38..e796f53f3b 100644 --- a/hadoop-tools/hadoop-distcp/pom.xml +++ b/hadoop-tools/hadoop-distcp/pom.xml @@ -20,7 +20,7 @@ 0.24.0-SNAPSHOT ../../hadoop-project - org.apache.hadoop.tools + org.apache.hadoop hadoop-distcp 0.24.0-SNAPSHOT Apache Hadoop Distributed Copy diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml index af2707021a..99df3efed7 100644 --- a/hadoop-tools/hadoop-tools-dist/pom.xml +++ b/hadoop-tools/hadoop-tools-dist/pom.xml @@ -38,6 +38,11 @@ hadoop-streaming compile + + org.apache.hadoop + hadoop-distcp + compile + org.apache.hadoop hadoop-archives