Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1236936 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-28 00:21:07 +00:00
commit 2b2e56056e
67 changed files with 1492 additions and 259 deletions

View File

@ -18,4 +18,4 @@
OK_RELEASEAUDIT_WARNINGS=0
OK_FINDBUGS_WARNINGS=0
OK_JAVADOC_WARNINGS=6
OK_JAVADOC_WARNINGS=13

View File

@ -74,7 +74,11 @@ Trunk (unreleased changes)
HADOOP-7987. Support setting the run-as user in unsecure mode. (jitendra)
HADOOP-7965. Support for protocol version and signature in PB. (jitendra)
BUGS
HADOOP-7998. CheckFileSystem does not correctly honor setVerifyChecksum
(Daryn Sharp via bobby)
HADOOP-7851. Configuration.getClasses() never returns the default value.
(Uma Maheswara Rao G via amarrk)
@ -209,6 +213,8 @@ Release 0.23.1 - Unreleased
HADOOP-7919. Remove the unused hadoop.logfile.* properties from the
core-default.xml file. (harsh)
HADOOP-7939. Improve Hadoop subcomponent integration in Hadoop 0.23. (rvs via tucu)
OPTIMIZATIONS
BUG FIXES
@ -290,6 +296,15 @@ Release 0.23.1 - Unreleased
HADOOP-7981. Improve documentation for org.apache.hadoop.io.compress.
Decompressor.getRemaining (Jonathan Eagles via mahadev)
HADOOP-7997. SequenceFile.createWriter(...createParent...) no
longer works on existing file. (Gregory Chanan via eli)
HADOOP-7993. Hadoop ignores old-style config options for enabling compressed
output. (Anupam Seth via mahadev)
HADOOP-8000. fetchdt command not available in bin/hadoop.
(Arpit Gupta via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -274,4 +274,8 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.HadoopRpcProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
</Match>
</FindBugsFilter>

View File

@ -51,7 +51,7 @@ fi
COMMAND=$1
case $COMMAND in
#hdfs commands
namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer)
namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|fetchdt)
echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." 1>&2
echo "Instead use the hdfs command for it." 1>&2
echo "" 1>&2

View File

@ -25,9 +25,21 @@ common_bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P)
script="$(basename -- "$this")"
this="$common_bin/$script"
[ -f "$common_bin/hadoop-layout.sh" ] && . "$common_bin/hadoop-layout.sh"
HADOOP_COMMON_DIR=${HADOOP_COMMON_DIR:-"share/hadoop/common"}
HADOOP_COMMON_LIB_JARS_DIR=${HADOOP_COMMON_LIB_JARS_DIR:-"share/hadoop/common/lib"}
HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_COMMON_LIB_NATIVE_DIR:-"lib/native"}
HDFS_DIR=${HDFS_DIR:-"share/hadoop/hdfs"}
HDFS_LIB_JARS_DIR=${HDFS_LIB_JARS_DIR:-"share/hadoop/hdfs/lib"}
YARN_DIR=${YARN_DIR:-"share/hadoop/mapreduce"}
YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"}
MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
# the root of the Hadoop installation
# See HADOOP-6255 for directory structure layout
HADOOP_DEFAULT_PREFIX=`dirname "$this"`/..
HADOOP_DEFAULT_PREFIX=$(cd -P -- "$common_bin"/.. && pwd -P)
HADOOP_PREFIX=${HADOOP_PREFIX:-$HADOOP_DEFAULT_PREFIX}
export HADOOP_PREFIX
@ -144,16 +156,22 @@ CLASSPATH="${HADOOP_CONF_DIR}"
# so that filenames w/ spaces are handled correctly in loops below
IFS=
if [ "$HADOOP_COMMON_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_DIR" ]; then
HADOOP_COMMON_HOME=$HADOOP_PREFIX
fi
fi
# for releases, add core hadoop jar & webapps to CLASSPATH
if [ -d "$HADOOP_PREFIX/share/hadoop/common/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common/webapps
if [ -d "$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR
fi
if [ -d "$HADOOP_PREFIX/share/hadoop/common/lib" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common/lib'/*'
if [ -d "$HADOOP_COMMON_HOME/$HADOOP_COMMON_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common'/*'
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR'/*'
# add user-specified CLASSPATH last
if [ "$HADOOP_CLASSPATH" != "" ]; then
@ -185,13 +203,13 @@ fi
# setup 'java.library.path' for native-hadoop code if necessary
if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/lib/native" ]; then
if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR" ]; then
if [ -d "${HADOOP_PREFIX}/lib/native" ]; then
if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR" ]; then
if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/lib/native
JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR
else
JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib/native
JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR
fi
fi
fi
@ -216,37 +234,56 @@ HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"
# put hdfs in classpath if present
if [ "$HADOOP_HDFS_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/share/hadoop/hdfs" ]; then
if [ -d "${HADOOP_PREFIX}/$HDFS_DIR" ]; then
HADOOP_HDFS_HOME=$HADOOP_PREFIX
fi
fi
if [ -d "$HADOOP_HDFS_HOME/share/hadoop/hdfs/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs
if [ -d "$HADOOP_HDFS_HOME/$HDFS_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR
fi
if [ -d "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib'/*'
if [ -d "$HADOOP_HDFS_HOME/$HDFS_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs'/*'
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR'/*'
# put yarn in classpath if present
if [ "$YARN_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/share/hadoop/mapreduce" ]; then
if [ -d "${HADOOP_PREFIX}/$YARN_DIR" ]; then
YARN_HOME=$HADOOP_PREFIX
fi
fi
if [ -d "$YARN_HOME/share/hadoop/mapreduce/webapps" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce
if [ -d "$YARN_HOME/$YARN_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR
fi
if [ -d "$YARN_HOME/share/hadoop/mapreduce/lib" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib'/*'
if [ -d "$YARN_HOME/$YARN_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce'/*'
CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR'/*'
# put mapred in classpath if present AND different from YARN
if [ "$HADOOP_MAPRED_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$MAPRED_DIR" ]; then
HADOOP_MAPRED_HOME=$HADOOP_PREFIX
fi
fi
if [ "$HADOOP_MAPRED_HOME/$MAPRED_DIR" != "$YARN_HOME/$YARN_DIR" ] ; then
if [ -d "$HADOOP_MAPRED_HOME/$MAPRED_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR
fi
if [ -d "$HADOOP_MAPRED_HOME/$MAPRED_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR'/*'
fi
# cygwin path translation
if $cygwin; then

View File

@ -345,7 +345,17 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
}
return name;
}
private void handleDeprecation() {
LOG.debug("Handling deprecation for all properties in config...");
Set<Object> keys = new HashSet<Object>();
keys.addAll(getProps().keySet());
for (Object item: keys) {
LOG.debug("Handling deprecation for " + (String)item);
handleDeprecation((String)item);
}
}
static{
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
@ -1667,7 +1677,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
Element conf = doc.createElement("configuration");
doc.appendChild(conf);
conf.appendChild(doc.createTextNode("\n"));
getProps(); // ensure properties is set
handleDeprecation(); //ensure properties is set and deprecation is handled
for (Enumeration e = properties.keys(); e.hasMoreElements();) {
String name = (String)e.nextElement();
Object object = properties.get(name);

View File

@ -304,8 +304,9 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new FSDataInputStream(
new ChecksumFSInputChecker(this, f, bufferSize));
return verifyChecksum
? new FSDataInputStream(new ChecksumFSInputChecker(this, f, bufferSize))
: getRawFileSystem().open(f, bufferSize);
}
/** {@inheritDoc} */

View File

@ -467,7 +467,7 @@ public class SequenceFile {
Metadata metadata) throws IOException {
return createWriter(FileContext.getFileContext(fs.getUri(), conf),
conf, name, keyClass, valClass, compressionType, codec,
metadata, EnumSet.of(CreateFlag.CREATE),
metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
CreateOpts.bufferSize(bufferSize),
createParent ? CreateOpts.createParent()
: CreateOpts.donotCreateParent(),

View File

@ -17,11 +17,10 @@
*/
package org.apache.hadoop.io.retry;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.Map;
@ -29,8 +28,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.util.ThreadUtil;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcInvocationHandler;
class RetryInvocationHandler implements InvocationHandler, Closeable {
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private FailoverProxyProvider proxyProvider;
@ -160,4 +161,11 @@ class RetryInvocationHandler implements InvocationHandler, Closeable {
proxyProvider.close();
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(currentProxy);
return inv.getConnectionId();
}
}

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.ipc;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
@ -37,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
@ -51,7 +50,6 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
@ -80,8 +78,19 @@ public class ProtobufRpcEngine implements RpcEngine {
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
addr, ticket, conf, factory, rpcTimeout)), false);
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
}
private static class Invoker implements InvocationHandler, Closeable {
private static class Invoker implements RpcInvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
@ -93,12 +102,20 @@ public class ProtobufRpcEngine implements RpcEngine {
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory,
RpcResponseWritable.class);
this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf), conf, factory);
}
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
public Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
}
private HadoopRpcRequestProto constructRpcRequest(Method method,
@ -222,6 +239,11 @@ public class ProtobufRpcEngine implements RpcEngine {
returnTypes.put(method.getName(), prototype);
return prototype;
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
return remoteId;
}
}
@Override

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
/**
* Protocol to get versions and signatures for supported protocols from the
* server.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations.
*/
@ProtocolInfo(
protocolName = "org.apache.hadoop.ipc.ProtocolMetaInfoPB",
protocolVersion = 1)
public interface ProtocolMetaInfoPB extends
ProtocolInfoService.BlockingInterface {
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolVersionProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class serves the requests for protocol versions and signatures by
* looking them up in the server registry.
*/
public class ProtocolMetaInfoServerSideTranslatorPB implements
ProtocolMetaInfoPB {
RPC.Server server;
public ProtocolMetaInfoServerSideTranslatorPB(RPC.Server server) {
this.server = server;
}
@Override
public GetProtocolVersionsResponseProto getProtocolVersions(
RpcController controller, GetProtocolVersionsRequestProto request)
throws ServiceException {
String protocol = request.getProtocol();
GetProtocolVersionsResponseProto.Builder builder =
GetProtocolVersionsResponseProto.newBuilder();
for (RpcKind r : RpcKind.values()) {
long[] versions;
try {
versions = getProtocolVersionForRpcKind(r, protocol);
} catch (ClassNotFoundException e) {
throw new ServiceException(e);
}
ProtocolVersionProto.Builder b = ProtocolVersionProto.newBuilder();
if (versions != null) {
b.setRpcKind(r.toString());
for (long v : versions) {
b.addVersions(v);
}
}
builder.addProtocolVersions(b.build());
}
return builder.build();
}
@Override
public GetProtocolSignatureResponseProto getProtocolSignature(
RpcController controller, GetProtocolSignatureRequestProto request)
throws ServiceException {
GetProtocolSignatureResponseProto.Builder builder = GetProtocolSignatureResponseProto
.newBuilder();
String protocol = request.getProtocol();
String rpcKind = request.getRpcKind();
long[] versions;
try {
versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind),
protocol);
} catch (ClassNotFoundException e1) {
throw new ServiceException(e1);
}
if (versions == null) {
return builder.build();
}
for (long v : versions) {
ProtocolSignatureProto.Builder sigBuilder = ProtocolSignatureProto
.newBuilder();
sigBuilder.setVersion(v);
try {
ProtocolSignature signature = ProtocolSignature.getProtocolSignature(
protocol, v);
for (int m : signature.getMethods()) {
sigBuilder.addMethods(m);
}
} catch (ClassNotFoundException e) {
throw new ServiceException(e);
}
builder.addProtocolSignature(sigBuilder.build());
}
return builder.build();
}
private long[] getProtocolVersionForRpcKind(RpcKind rpcKind,
String protocol) throws ClassNotFoundException {
Class<?> protocolClass = Class.forName(protocol);
String protocolName = RPC.getProtocolName(protocolClass);
VerProtocolImpl[] vers = server.getSupportedProtocolVersions(rpcKind,
protocolName);
if (vers == null) {
return null;
}
long [] versions = new long[vers.length];
for (int i=0; i<versions.length; i++) {
versions[i] = vers[i].version;
}
return versions;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This interface is implemented by the client side translators and can be used
* to obtain information about underlying protocol e.g. to check if a method is
* supported on the server side.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface ProtocolMetaInterface {
/**
* Checks whether the given method name is supported by the server.
* It is assumed that all method names are unique for a protocol.
* @param methodName The name of the method
* @return true if method is supported, otherwise false.
* @throws IOException
*/
public boolean isMethodSupported(String methodName) throws IOException;
}

View File

@ -183,7 +183,7 @@ public class ProtocolSignature implements Writable {
* @return its signature and finger print
*/
private static ProtocolSigFingerprint getSigFingerprint(
Class <? extends VersionedProtocol> protocol, long serverVersion) {
Class <?> protocol, long serverVersion) {
String protocolName = RPC.getProtocolName(protocol);
synchronized (PROTOCOL_FINGERPRINT_CACHE) {
ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
@ -221,6 +221,12 @@ public class ProtocolSignature implements Writable {
return sig.signature;
}
public static ProtocolSignature getProtocolSignature(String protocolName,
long version) throws ClassNotFoundException {
Class<?> protocol = Class.forName(protocolName);
return getSigFingerprint(protocol, version).signature;
}
/**
* Get a server protocol's signature
*

View File

@ -41,6 +41,7 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
@ -49,6 +50,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.BlockingService;
/** A simple RPC mechanism.
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
@ -177,8 +180,8 @@ public class RPC {
}
// return the RpcEngine configured to handle a protocol
private static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
@ -522,7 +525,16 @@ public class RPC {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
/**
* Returns the server address for a given proxy.
*/
public static InetSocketAddress getServerAddress(Object proxy) {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(proxy);
return inv.getConnectionId().getAddress();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
@ -817,6 +829,19 @@ public class RPC {
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager);
initProtocolMetaInfo(conf);
}
private void initProtocolMetaInfo(Configuration conf)
throws IOException {
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(this);
BlockingService protocolInfoBlockingService = ProtocolInfoService
.newReflectiveBlockingService(xlator);
addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class,
protocolInfoBlockingService);
}
/**

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class maintains a cache of protocol versions and corresponding protocol
* signatures, keyed by server address, protocol and rpc kind.
* The cache is lazily populated.
*/
public class RpcClientUtil {
private static RpcController NULL_CONTROLLER = null;
private static final int PRIME = 16777619;
private static class ProtoSigCacheKey {
private InetSocketAddress serverAddress;
private String protocol;
private String rpcKind;
ProtoSigCacheKey(InetSocketAddress addr, String p, String rk) {
this.serverAddress = addr;
this.protocol = p;
this.rpcKind = rk;
}
@Override //Object
public int hashCode() {
int result = 1;
result = PRIME * result
+ ((serverAddress == null) ? 0 : serverAddress.hashCode());
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
result = PRIME * result + ((rpcKind == null) ? 0 : rpcKind.hashCode());
return result;
}
@Override //Object
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof ProtoSigCacheKey) {
ProtoSigCacheKey otherKey = (ProtoSigCacheKey) other;
return (serverAddress.equals(otherKey.serverAddress) &&
protocol.equals(otherKey.protocol) &&
rpcKind.equals(otherKey.rpcKind));
}
return false;
}
}
private static ConcurrentHashMap<ProtoSigCacheKey, Map<Long, ProtocolSignature>>
signatureMap = new ConcurrentHashMap<ProtoSigCacheKey, Map<Long, ProtocolSignature>>();
private static void putVersionSignatureMap(InetSocketAddress addr,
String protocol, String rpcKind, Map<Long, ProtocolSignature> map) {
signatureMap.put(new ProtoSigCacheKey(addr, protocol, rpcKind), map);
}
private static Map<Long, ProtocolSignature> getVersionSignatureMap(
InetSocketAddress addr, String protocol, String rpcKind) {
return signatureMap.get(new ProtoSigCacheKey(addr, protocol, rpcKind));
}
/**
* Returns whether the given method is supported or not.
* The protocol signatures are fetched and cached. The connection id for the
* proxy provided is re-used.
* @param rpcProxy Proxy which provides an existing connection id.
* @param protocol Protocol for which the method check is required.
* @param rpcKind The RpcKind for which the method check is required.
* @param version The version at the client.
* @param methodName Name of the method.
* @return true if the method is supported, false otherwise.
* @throws IOException
*/
public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol,
RpcKind rpcKind, long version, String methodName) throws IOException {
InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy);
Map<Long, ProtocolSignature> versionMap = getVersionSignatureMap(
serverAddress, protocol.getName(), rpcKind.toString());
if (versionMap == null) {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoPB protocolInfoProxy = getProtocolMetaInfoProxy(rpcProxy,
conf);
GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
GetProtocolSignatureResponseProto resp;
try {
resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER,
builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
versionMap = convertProtocolSignatureProtos(resp
.getProtocolSignatureList());
putVersionSignatureMap(serverAddress, protocol.getName(),
rpcKind.toString(), versionMap);
}
// Assuming unique method names.
Method desiredMethod;
Method[] allMethods = protocol.getMethods();
desiredMethod = null;
for (Method m : allMethods) {
if (m.getName().equals(methodName)) {
desiredMethod = m;
break;
}
}
if (desiredMethod == null) {
return false;
}
int methodHash = ProtocolSignature.getFingerprint(desiredMethod);
return methodExists(methodHash, version, versionMap);
}
private static Map<Long, ProtocolSignature>
convertProtocolSignatureProtos(List<ProtocolSignatureProto> protoList) {
Map<Long, ProtocolSignature> map = new TreeMap<Long, ProtocolSignature>();
for (ProtocolSignatureProto p : protoList) {
int [] methods = new int[p.getMethodsList().size()];
int index=0;
for (int m : p.getMethodsList()) {
methods[index++] = m;
}
map.put(p.getVersion(), new ProtocolSignature(p.getVersion(), methods));
}
return map;
}
private static boolean methodExists(int methodHash, long version,
Map<Long, ProtocolSignature> versionMap) {
ProtocolSignature sig = versionMap.get(version);
if (sig != null) {
for (int m : sig.getMethods()) {
if (m == methodHash) {
return true;
}
}
}
return false;
}
// The proxy returned re-uses the underlying connection. This is a special
// mechanism for ProtocolMetaInfoPB.
// Don't do this for any other protocol, it might cause a security hole.
private static ProtocolMetaInfoPB getProtocolMetaInfoProxy(Object proxy,
Configuration conf) throws IOException {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(proxy);
return RPC
.getProtocolEngine(ProtocolMetaInfoPB.class, conf)
.getProtocolMetaInfoProxy(inv.getConnectionId(), conf,
NetUtils.getDefaultSocketFactory(conf)).getProxy();
}
}

View File

@ -26,6 +26,7 @@ import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -54,4 +55,16 @@ public interface RpcEngine {
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException;
/**
* Returns a proxy for ProtocolMetaInfoPB, which uses the given connection
* id.
* @param connId, ConnectionId to be used for the proxy.
* @param conf, Configuration.
* @param factory, Socket factory.
* @return Proxy object.
* @throws IOException
*/
ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException;
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.Closeable;
import java.lang.reflect.InvocationHandler;
import org.apache.hadoop.ipc.Client.ConnectionId;
/**
* This interface must be implemented by all InvocationHandler
* implementations.
*/
public interface RpcInvocationHandler extends InvocationHandler, Closeable {
/**
* Returns the connection id associated with the InvocationHandler instance.
* @return ConnectionId
*/
ConnectionId getConnectionId();
}

View File

@ -21,18 +21,17 @@ package org.apache.hadoop.ipc;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.io.*;
import java.io.Closeable;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol;
@ -202,7 +201,7 @@ public class WritableRpcEngine implements RpcEngine {
private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements InvocationHandler, Closeable {
private static class Invoker implements RpcInvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
@ -239,6 +238,11 @@ public class WritableRpcEngine implements RpcEngine {
CLIENTS.stopClient(client);
}
}
@Override
public ConnectionId getConnectionId() {
return remoteId;
}
}
// for unit testing only
@ -524,4 +528,11 @@ public class WritableRpcEngine implements RpcEngine {
}
}
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "ProtocolInfoProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
/**
* Request to get protocol versions for all supported rpc kinds.
*/
message GetProtocolVersionsRequestProto {
required string protocol = 1; // Protocol name
}
/**
* Protocol version with corresponding rpc kind.
*/
message ProtocolVersionProto {
required string rpcKind = 1; //RPC kind
repeated uint64 versions = 2; //Protocol version corresponding to the rpc kind.
}
/**
* Get protocol version response.
*/
message GetProtocolVersionsResponseProto {
repeated ProtocolVersionProto protocolVersions = 1;
}
/**
* Get protocol signature request.
*/
message GetProtocolSignatureRequestProto {
required string protocol = 1; // Protocol name
required string rpcKind = 2; // RPC kind
}
/**
* Get protocol signature response.
*/
message GetProtocolSignatureResponseProto {
repeated ProtocolSignatureProto protocolSignature = 1;
}
message ProtocolSignatureProto {
required uint64 version = 1;
repeated uint32 methods = 2;
}
/**
* Protocol to get information about protocols.
*/
service ProtocolInfoService {
/**
* Return protocol version corresponding to protocol interface for each
* supported rpc kind.
*/
rpc getProtocolVersions(GetProtocolVersionsRequestProto)
returns (GetProtocolVersionsResponseProto);
/**
* Return protocol version corresponding to protocol interface.
*/
rpc getProtocolSignature(GetProtocolSignatureRequestProto)
returns (GetProtocolSignatureResponseProto);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.conf;
import java.io.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -32,4 +34,22 @@ public class TestDeprecatedKeys extends TestCase {
String scriptFile = conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY);
assertTrue(scriptFile.equals("xyz")) ;
}
//Tests reading / writing a conf file with deprecation after setting
public void testReadWriteWithDeprecatedKeys() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean("old.config.yet.to.be.deprecated", true);
Configuration.addDeprecation("old.config.yet.to.be.deprecated",
new String[]{"new.conf.to.replace.deprecated.conf"});
ByteArrayOutputStream out=new ByteArrayOutputStream();
String fileContents;
try {
conf.writeXml(out);
fileContents = out.toString();
} finally {
out.close();
}
assertTrue(fileContents.contains("old.config.yet.to.be.deprecated"));
assertTrue(fileContents.contains("new.conf.to.replace.deprecated.conf"));
}
}

View File

@ -22,12 +22,22 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
import org.apache.hadoop.conf.Configuration;
import junit.framework.TestCase;
import org.junit.*;
import static org.junit.Assert.*;
public class TestChecksumFileSystem extends TestCase {
public class TestChecksumFileSystem {
static final String TEST_ROOT_DIR
= System.getProperty("test.build.data","build/test/data/work-dir/localfs");
static LocalFileSystem localFs;
@Before
public void resetLocalFs() throws Exception {
localFs = FileSystem.getLocal(new Configuration());
localFs.setVerifyChecksum(true);
}
@Test
public void testgetChecksumLength() throws Exception {
assertEquals(8, ChecksumFileSystem.getChecksumLength(0L, 512));
assertEquals(12, ChecksumFileSystem.getChecksumLength(1L, 512));
@ -40,9 +50,8 @@ public class TestChecksumFileSystem extends TestCase {
ChecksumFileSystem.getChecksumLength(10000000000000L, 10));
}
@Test
public void testVerifyChecksum() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testPath");
Path testPath11 = new Path(TEST_ROOT_DIR, "testPath11");
FSDataOutputStream fout = localFs.create(testPath);
@ -68,7 +77,7 @@ public class TestChecksumFileSystem extends TestCase {
//copying the wrong checksum file
FileUtil.copy(localFs, localFs.getChecksumFile(testPath11), localFs,
localFs.getChecksumFile(testPath),false,true,conf);
localFs.getChecksumFile(testPath),false,true,localFs.getConf());
assertTrue("checksum exists", localFs.exists(localFs.getChecksumFile(testPath)));
boolean errorRead = false;
@ -80,20 +89,13 @@ public class TestChecksumFileSystem extends TestCase {
assertTrue("error reading", errorRead);
//now setting verify false, the read should succeed
try {
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing".equals(str));
} finally {
// reset for other tests
localFs.setVerifyChecksum(true);
}
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing".equals(str));
}
@Test
public void testMultiChunkFile() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testMultiChunk");
FSDataOutputStream fout = localFs.create(testPath);
for (int i = 0; i < 1000; i++) {
@ -116,9 +118,8 @@ public class TestChecksumFileSystem extends TestCase {
* Test to ensure that if the checksum file is truncated, a
* ChecksumException is thrown
*/
@Test
public void testTruncatedChecksum() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testtruncatedcrc");
FSDataOutputStream fout = localFs.create(testPath);
fout.write("testing truncation".getBytes());
@ -146,14 +147,60 @@ public class TestChecksumFileSystem extends TestCase {
}
// telling it not to verify checksums, should avoid issue.
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing truncation".equals(str));
}
@Test
public void testStreamType() throws Exception {
Path testPath = new Path(TEST_ROOT_DIR, "testStreamType");
localFs.create(testPath).close();
FSDataInputStream in = null;
localFs.setVerifyChecksum(true);
in = localFs.open(testPath);
assertTrue("stream is input checker",
in.getWrappedStream() instanceof FSInputChecker);
localFs.setVerifyChecksum(false);
in = localFs.open(testPath);
assertFalse("stream is not input checker",
in.getWrappedStream() instanceof FSInputChecker);
}
@Test
public void testCorruptedChecksum() throws Exception {
Path testPath = new Path(TEST_ROOT_DIR, "testCorruptChecksum");
Path checksumPath = localFs.getChecksumFile(testPath);
// write a file to generate checksum
FSDataOutputStream out = localFs.create(testPath, true);
out.write("testing 1 2 3".getBytes());
out.close();
assertTrue(localFs.exists(checksumPath));
FileStatus stat = localFs.getFileStatus(checksumPath);
// alter file directly so checksum is invalid
out = localFs.getRawFileSystem().create(testPath, true);
out.write("testing stale checksum".getBytes());
out.close();
assertTrue(localFs.exists(checksumPath));
// checksum didn't change on disk
assertEquals(stat, localFs.getFileStatus(checksumPath));
Exception e = null;
try {
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing truncation".equals(str));
} finally {
// reset for other tests
localFs.setVerifyChecksum(true);
readFile(localFs, testPath, 1024);
} catch (ChecksumException ce) {
e = ce;
} finally {
assertNotNull("got checksum error", e);
}
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024);
assertEquals("testing stale checksum", str);
}
}

View File

@ -517,6 +517,23 @@ public class TestSequenceFile extends TestCase {
assertTrue("InputStream for " + path + " should have been closed.", openedFile[0].isClosed());
}
/**
* Test that makes sure createWriter succeeds on a file that was
* already created
* @throws IOException
*/
public void testCreateWriterOnExistingFile() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path name = new Path(new Path(System.getProperty("test.build.data","."),
"createWriterOnExistingFile") , "file");
fs.create(name);
SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
RandomDatum.class, 512, (short) 1, 4096, false,
CompressionType.NONE, null, new Metadata());
}
public void testRecursiveSeqFileCreate() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
@ -259,7 +260,13 @@ public class TestRPC {
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
return null;
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
}
/**

View File

@ -32,6 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Test;
@ -302,4 +305,72 @@ System.out.println("echo int is NOT supported");
ex.getMessage().contains("VersionMismatch"));
}
}
@Test
public void testIsMethodSupported() throws IOException {
server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
TestProtocol2.versionID, addr, conf);
boolean supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RpcKind.RPC_WRITABLE,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertTrue(supported);
supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertFalse(supported);
}
/**
* Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
* the server registry to extract protocol signatures and versions.
*/
@Test
public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(server);
GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RpcKind.RPC_PROTOCOL_BUFFER));
//No signatures should be found
Assert.assertEquals(0, resp.getProtocolSignatureCount());
resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RpcKind.RPC_WRITABLE));
Assert.assertEquals(1, resp.getProtocolSignatureCount());
ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
boolean found = false;
int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
.getMethod("echo", String.class));
for (int m : sig.getMethodsList()) {
if (expected == m) {
found = true;
break;
}
}
Assert.assertTrue(found);
}
private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
Class<?> protocol, RpcKind rpcKind) {
GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
return builder.build();
}
}

View File

@ -18,4 +18,4 @@
OK_RELEASEAUDIT_WARNINGS=0
OK_FINDBUGS_WARNINGS=0
OK_JAVADOC_WARNINGS=2
OK_JAVADOC_WARNINGS=0

View File

@ -53,6 +53,11 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>

View File

@ -55,8 +55,8 @@ if [ "${1}" = "stop" ]; then
fi
if [ "${HTTPFS_SILENT}" != "true" ]; then
${BASEDIR}/share/hadoop/httpfs/tomcat/bin/catalina.sh "$@"
${CATALINA_BASE:-"${BASEDIR}/share/hadoop/httpfs/tomcat"}/bin/catalina.sh "$@"
else
${BASEDIR}/share/hadoop/httpfs/tomcat/bin/catalina.sh "$@" > /dev/null
${CATALINA_BASE:-"${BASEDIR}/share/hadoop/httpfs/tomcat"}/bin/catalina.sh "$@" > /dev/null
fi

View File

@ -365,6 +365,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2836. HttpFSServer still has 2 javadoc warnings in trunk (revans2 via tucu)
HDFS-2837. mvn javadoc:javadoc not seeing LimitedPrivate class (revans2 via tucu)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -22,8 +22,6 @@ bin=`which "$0"`
bin=`dirname "${bin}"`
bin=`cd "$bin"; pwd`
export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then

View File

@ -79,6 +79,9 @@ Trunk (unreleased changes)
MAPREDUCE-3664. Federation Documentation has incorrect configuration example.
(Brandon Li via jitendra)
MAPREDUCE-3740. Fixed broken mapreduce compilation after the patch for
HADOOP-7965. (Devaraj K via vinodkv)
Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES
@ -197,6 +200,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-2765. DistCp Rewrite. (Mithun Radhakrishnan via mahadev)
MAPREDUCE-3737. The Web Application Proxy's is not documented very well.
(Robert Evans via mahadev)
MAPREDUCE-3699. Increased RPC handlers for all YARN servers to reasonable
values for working at scale. (Hitesh Shah via vinodkv)
MAPREDUCE-3693. Added mapreduce.admin.user.env to mapred-default.xml.
(Roman Shapshonik via acmurthy)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -220,6 +232,12 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush
for every event slowing down AM. (Siddarth Seth via vinodkv)
MAPREDUCE-3718. Change default AM heartbeat interval to 1 second. (Hitesh
Shah via sseth)
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
@ -576,6 +594,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3733. Add Apache License Header to hadoop-distcp/pom.xml.
(mahadev)
MAPREDUCE-3735. Add distcp jar to the distribution (tar).
(mahadev)
MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific
information not available at RM. (vinodkv via acmurthy)
MAPREDUCE-3742. "yarn logs" command fails with ClassNotFoundException.
(Jason Lowe via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -91,15 +91,15 @@ if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then
fi
# for releases, add core mapred jar & webapps to CLASSPATH
if [ -d "$HADOOP_PREFIX/share/hadoop/mapreduce/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/mapreduce
if [ -d "$HADOOP_PREFIX/${MAPRED_DIR}/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/${MAPRED_DIR}
fi
for f in $HADOOP_MAPRED_HOME/share/hadoop-mapreduce/*.jar; do
for f in $HADOOP_MAPRED_HOME/${MAPRED_DIR}/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
# add libs to CLASSPATH
for f in $HADOOP_MAPRED_HOME/lib/*.jar; do
for f in $HADOOP_MAPRED_HOME/${MAPRED_LIB_JARS_DIR}/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done

View File

@ -455,10 +455,14 @@ public class Job extends JobContextImpl implements JobContext {
public String toString() {
ensureState(JobState.RUNNING);
String reasonforFailure = " ";
int numMaps = 0;
int numReduces = 0;
try {
updateStatus();
if (status.getState().equals(JobStatus.State.FAILED))
reasonforFailure = getTaskFailureEventString();
numMaps = getTaskReports(TaskType.MAP).length;
numReduces = getTaskReports(TaskType.REDUCE).length;
} catch (IOException e) {
} catch (InterruptedException ie) {
}
@ -468,6 +472,8 @@ public class Job extends JobContextImpl implements JobContext {
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("Number of maps: ").append(numMaps);
sb.append("Number of reduces: ").append(numReduces);
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");

View File

@ -412,12 +412,12 @@ public interface MRJobConfig {
/** The number of threads used to handle task RPC calls.*/
public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
MR_AM_PREFIX + "job.task.listener.thread-count";
public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 10;
public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 30;
/** How often the AM should send heartbeats to the RM.*/
public static final String MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS =
MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000;
public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
/**
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.tools;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@ -28,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.RemoteException;
@ -579,25 +581,28 @@ public class CLI extends Configured implements Tool {
}
}
}
public void displayJobList(JobStatus[] jobs)
throws IOException, InterruptedException {
System.out.println("Total jobs:" + jobs.length);
System.out.println("JobId\tState\tStartTime\t" +
"UserName\tQueue\tPriority\tMaps\tReduces\tUsedContainers\t" +
"RsvdContainers\tUsedMem\tRsvdMem\tNeededMem\tAM info");
for (JobStatus job : jobs) {
TaskReport[] mapReports =
cluster.getJob(job.getJobID()).getTaskReports(TaskType.MAP);
TaskReport[] reduceReports =
cluster.getJob(job.getJobID()).getTaskReports(TaskType.REDUCE);
displayJobList(jobs, new PrintWriter(System.out));
}
System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%dM\t%dM\t%dM\t%s\n",
@Private
public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
@Private
public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n";
@Private
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
writer.println("Total jobs:" + jobs.length);
writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
"Queue", "Priority", "UsedContainers",
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
for (JobStatus job : jobs) {
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),
job.getPriority().name(),
mapReports.length,
reduceReports.length,
job.getNumUsedSlots(),
job.getNumReservedSlots(),
job.getUsedMem(),
@ -605,6 +610,7 @@ public class CLI extends Configured implements Tool {
job.getNeededMem(),
job.getSchedulingInfo());
}
writer.flush();
}
public static void main(String[] argv) throws Exception {

View File

@ -440,6 +440,16 @@
</description>
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native</value>
<description>Expert: Additional execution environment entries for
map and reduce task processes. This is not an additive property.
You must preserve the original value if you want your map and
reduce tasks to have access to native libraries (compression, etc).
</description>
</property>
<property>
<name>mapreduce.task.tmp.dir</name>
<value>./tmp</value>
@ -1224,4 +1234,18 @@
mapreduce.job.end-notification.max.retry.interval</description>
</property>
<property>
<name>yarn.app.mapreduce.am.job.task.listener.thread-count</name>
<value>30</value>
<description>The number of threads used to handle RPC calls in the
MR AppMaster from remote tasks</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>
<description>The interval in ms at which the MR AppMaster should send
heartbeats to the ResourceManager</description>
</property>
</configuration>

View File

@ -22,19 +22,24 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import java.io.PrintWriter;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Assert;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class JobClientUnitTest {
public class TestJobClient extends JobClient {
@ -48,7 +53,6 @@ public class JobClientUnitTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void testMapTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -64,7 +68,6 @@ public class JobClientUnitTest {
verify(mockCluster).getJob(id);
}
@SuppressWarnings("deprecation")
@Test
public void testReduceTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -80,7 +83,6 @@ public class JobClientUnitTest {
verify(mockCluster).getJob(id);
}
@SuppressWarnings("deprecation")
@Test
public void testSetupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -96,7 +98,6 @@ public class JobClientUnitTest {
verify(mockCluster).getJob(id);
}
@SuppressWarnings("deprecation")
@Test
public void testCleanupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -115,12 +116,15 @@ public class JobClientUnitTest {
@Test
public void testShowJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
JobID jobID = new JobID("test", 0);
long startTime = System.currentTimeMillis();
JobID jobID = new JobID(String.valueOf(startTime), 12345);
JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobStatus.getJobID()).thenReturn(jobID);
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
when(mockJobStatus.getStartTime()).thenReturn(0L);
when(mockJobStatus.getStartTime()).thenReturn(startTime);
when(mockJobStatus.getUsername()).thenReturn("mockuser");
when(mockJobStatus.getQueue()).thenReturn("mockqueue");
when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
@ -132,18 +136,21 @@ public class JobClientUnitTest {
when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
Job mockJob = mock(Job.class);
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(new TaskReport[0]);
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(
new TaskReport[5]);
Cluster mockCluster = mock(Cluster.class);
when(mockCluster.getJob(jobID)).thenReturn(mockJob);
client.setCluster(mockCluster);
client.displayJobList(new JobStatus[] {mockJobStatus});
ByteArrayOutputStream out = new ByteArrayOutputStream();
client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out));
String commandLineOutput = out.toString();
System.out.println(commandLineOutput);
Assert.assertTrue(commandLineOutput.contains("Total jobs:1"));
verify(mockJobStatus, atLeastOnce()).getJobID();
verify(mockJob, atLeastOnce()).getTaskReports(isA(TaskType.class));
verify(mockCluster, atLeastOnce()).getJob(jobID);
verify(mockJobStatus).getState();
verify(mockJobStatus).getStartTime();
verify(mockJobStatus).getUsername();
@ -155,5 +162,9 @@ public class JobClientUnitTest {
verify(mockJobStatus).getReservedMem();
verify(mockJobStatus).getNeededMem();
verify(mockJobStatus).getSchedulingInfo();
// This call should not go to each AM.
verify(mockCluster, never()).getJob(jobID);
verify(mockJob, never()).getTaskReports(isA(TaskType.class));
}
}

View File

@ -140,8 +140,8 @@ if [ -d "$YARN_HOME/build/tools" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/build/tools
fi
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/${YARN_DIR}/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/${YARN_LIB_JARS_DIR}/*
# so that filenames w/ spaces are handled correctly in loops below
IFS=
@ -194,7 +194,7 @@ elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "logs" ] ; then
CLASS=org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper
CLASS=org.apache.hadoop.yarn.logaggregation.LogDumper
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel

View File

@ -19,8 +19,6 @@ bin=`which "$0"`
bin=`dirname "${bin}"`
bin=`cd "$bin"; pwd`
export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then

View File

@ -90,7 +90,7 @@ public class YarnConfiguration extends Configuration {
/** The number of threads used to handle applications manager requests.*/
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10;
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 50;
/** The Kerberos principal for the resource manager.*/
public static final String RM_PRINCIPAL =
@ -106,7 +106,7 @@ public class YarnConfiguration extends Configuration {
/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
RM_PREFIX + "scheduler.client.thread-count";
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 10;
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
/** The address of the RM web application.*/
public static final String RM_WEBAPP_ADDRESS =
@ -184,7 +184,7 @@ public class YarnConfiguration extends Configuration {
/** Number of threads to handle resource tracker calls.*/
public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT =
RM_PREFIX + "resource-tracker.client.thread-count";
public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 10;
public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50;
/** The class to use as the resource scheduler.*/
public static final String RM_SCHEDULER =
@ -257,7 +257,7 @@ public class YarnConfiguration extends Configuration {
/** Number of threads container manager uses.*/
public static final String NM_CONTAINER_MGR_THREAD_COUNT =
NM_PREFIX + "container-manager.thread-count";
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 5;
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
/** Number of threads used in cleanup.*/
public static final String NM_DELETE_THREAD_COUNT =

View File

@ -36,10 +36,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.ClientCache;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@ -73,6 +75,17 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
addr, ticket, conf, factory, rpcTimeout)), false);
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
}
private static class Invoker implements InvocationHandler, Closeable {
private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
@ -82,8 +95,13 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf), conf, factory);
}
public Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory,
ProtoSpecificResponseWritable.class);
}

View File

@ -67,7 +67,7 @@
<property>
<description>The number of threads used to handle applications manager requests.</description>
<name>yarn.resourcemanager.client.thread-count</name>
<value>10</value>
<value>50</value>
</property>
<property>
@ -90,7 +90,7 @@
<property>
<description>Number of threads to handle scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.client.thread-count</name>
<value>10</value>
<value>50</value>
</property>
<property>
@ -179,7 +179,7 @@
<property>
<description>Number of threads to handle resource tracker calls.</description>
<name>yarn.resourcemanager.resource-tracker.client.thread-count</name>
<value>10</value>
<value>50</value>
</property>
<property>
@ -244,7 +244,7 @@
<property>
<description>Number of threads container manager uses.</description>
<name>yarn.nodemanager.container-manager.thread-count</name>
<value>5</value>
<value>20</value>
</property>
<property>

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -39,9 +38,9 @@ public class ClusterMetrics {
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
@Metric("# of NMs") MutableGaugeInt numNMs;
@Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableCounterInt numLostNMs;
@Metric("# of active NMs") MutableGaugeInt numNMs;
@Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableGaugeInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
@ -73,8 +72,8 @@ public class ClusterMetrics {
}
}
//Total Nodemanagers
public int getNumNMs() {
//Active Nodemanagers
public int getNumActiveNMs() {
return numNMs.value();
}
@ -87,6 +86,10 @@ public class ClusterMetrics {
numDecommissionedNMs.incr();
}
public void decrDecommisionedNMs() {
numDecommissionedNMs.decr();
}
//Lost NMs
public int getNumLostNMs() {
return numLostNMs.value();
@ -96,6 +99,10 @@ public class ClusterMetrics {
numLostNMs.incr();
}
public void decrNumLostNMs() {
numLostNMs.decr();
}
//Unhealthy NMs
public int getUnhealthyNMs() {
return numUnhealthyNMs.value();
@ -118,6 +125,10 @@ public class ClusterMetrics {
numRebootedNMs.incr();
}
public void decrNumRebootedNMs() {
numRebootedNMs.decr();
}
public void removeNode(RMNodeEventType nodeEventType) {
numNMs.decr();
switch(nodeEventType){

View File

@ -43,6 +43,8 @@ public interface RMContext {
ApplicationsStore getApplicationsStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
ConcurrentMap<String, RMNode> getInactiveRMNodes();
ConcurrentMap<NodeId, RMNode> getRMNodes();

View File

@ -43,6 +43,9 @@ public class RMContextImpl implements RMContext {
private final ConcurrentMap<NodeId, RMNode> nodes
= new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private AMLivelinessMonitor amLivelinessMonitor;
private ContainerAllocationExpirer containerAllocationExpirer;
@ -83,6 +86,11 @@ public class RMContextImpl implements RMContext {
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes;
}
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
}
@Override
public ContainerAllocationExpirer getContainerAllocationExpirer() {

View File

@ -220,10 +220,6 @@ public class ResourceTrackerService extends AbstractService implements
if (rmNode == null) {
/* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
// Updating the metrics directly as reboot event cannot be
// triggered on a null rmNode
ClusterMetrics.getMetrics().incrNumRebootedNMs();
return reboot;
}

View File

@ -119,7 +119,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.DECOMMISSION, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
RMNodeEventType.EXPIRE, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
.addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED,
RMNodeEventType.REBOOTING, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@ -307,6 +307,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private void updateMetrics(RMNodeState nodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
switch (nodeState) {
case LOST:
metrics.decrNumLostNMs();
break;
case REBOOTED:
metrics.decrNumRebootedNMs();
break;
case DECOMMISSIONED:
metrics.decrDecommisionedNMs();
break;
}
}
@SuppressWarnings("unchecked")
@Override
@ -315,6 +330,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
RMNode node = rmNode.context.getInactiveRMNodes().get(host);
rmNode.context.getInactiveRMNodes().remove(host);
updateMetrics(node.getState());
}
ClusterMetrics.getMetrics().addNode();
}
@ -353,7 +375,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Remove the node from the system.
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Removed Node " + rmNode.nodeId);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
ClusterMetrics.getMetrics().removeNode(event.getType());
}

View File

@ -68,7 +68,7 @@ public class MetricsOverviewTable extends HtmlBlock {
th().$class("ui-state-default")._("Memory Used")._().
th().$class("ui-state-default")._("Memory Total")._().
th().$class("ui-state-default")._("Memory Reserved")._().
th().$class("ui-state-default")._("Total Nodes")._().
th().$class("ui-state-default")._("Active Nodes")._().
th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._().
@ -82,7 +82,7 @@ public class MetricsOverviewTable extends HtmlBlock {
td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getReservedMB() * BYTES_IN_MB)).
td().a(url("nodes"),String.valueOf(clusterMetrics.getTotalNodes()))._().
td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._().
td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._().
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().

View File

@ -24,6 +24,8 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
@ -79,7 +82,19 @@ class NodesPage extends RmView {
if(type != null && !type.isEmpty()) {
stateFilter = RMNodeState.valueOf(type.toUpperCase());
}
for (RMNode ni : this.rmContext.getRMNodes().values()) {
Collection<RMNode> rmNodes = this.rmContext.getRMNodes().values();
boolean isInactive = false;
if (stateFilter != null) {
switch (stateFilter) {
case DECOMMISSIONED:
case LOST:
case REBOOTED:
rmNodes = this.rmContext.getInactiveRMNodes().values();
isInactive = true;
break;
}
}
for (RMNode ni : rmNodes) {
if(stateFilter != null) {
RMNodeState state = ni.getState();
if(!stateFilter.equals(state)) {
@ -89,12 +104,17 @@ class NodesPage extends RmView {
NodeInfo info = new NodeInfo(ni, sched);
int usedMemory = (int)info.getUsedMemory();
int availableMemory = (int)info.getAvailableMemory();
tbody.tr().
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr().
td(info.getRack()).
td(info.getState()).
td(info.getNodeId()).
td().a("http://" + info.getNodeHTTPAddress(), info.getNodeHTTPAddress())._().
td(info.getHealthStatus()).
td(info.getNodeId());
if (isInactive) {
row.td()._("N/A")._();
} else {
String httpAddress = info.getNodeHTTPAddress();
row.td().a("http://" + httpAddress, httpAddress)._();
}
row.td(info.getHealthStatus()).
td(Times.format(info.getLastHealthUpdate())).
td(info.getHealthReport()).
td(String.valueOf(info.getNumContainers())).

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletRequest;
@ -68,6 +69,7 @@ import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/cluster")
public class RMWebServices {
private static final String EMPTY = "";
private static final Log LOG = LogFactory.getLog(RMWebServices.class);
private final ResourceManager rm;
private static RecordFactory recordFactory = RecordFactoryProvider
@ -144,12 +146,23 @@ public class RMWebServices {
if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance");
}
Collection<RMNode> rmNodes = this.rm.getRMContext().getRMNodes().values();
boolean isInactive = false;
if (filterState != null && !filterState.isEmpty()) {
RMNodeState nodeState = RMNodeState.valueOf(filterState.toUpperCase());
switch (nodeState) {
case DECOMMISSIONED:
case LOST:
case REBOOTED:
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
isInactive = true;
break;
}
}
NodesInfo allNodes = new NodesInfo();
for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) {
for (RMNode ni : rmNodes) {
NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (filterState != null) {
RMNodeState.valueOf(filterState);
if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) {
continue;
}
@ -165,6 +178,9 @@ public class RMWebServices {
continue;
}
}
if (isInactive) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
allNodes.add(nodeInfo);
}
return allNodes;
@ -183,10 +199,19 @@ public class RMWebServices {
}
NodeId nid = ConverterUtils.toNodeId(nodeId);
RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
boolean isInactive = false;
if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost());
if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
}
isInactive = true;
}
return new NodeInfo(ni, sched);
NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (isInactive) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
return nodeInfo;
}
@GET

View File

@ -44,6 +44,7 @@ public class ClusterMetricsInfo {
protected int unhealthyNodes;
protected int decommissionedNodes;
protected int rebootedNodes;
protected int activeNodes;
public ClusterMetricsInfo() {
} // JAXB needs this
@ -59,12 +60,13 @@ public class ClusterMetricsInfo {
this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB;
this.containersAllocated = metrics.getAllocatedContainers();
this.totalMB = availableMB + reservedMB + allocatedMB;
this.totalNodes = clusterMetrics.getNumNMs();
this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs();
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
+ rebootedNodes;
}
public int getAppsSubmitted() {
@ -94,6 +96,10 @@ public class ClusterMetricsInfo {
public int getTotalNodes() {
return this.totalNodes;
}
public int getActiveNodes() {
return this.activeNodes;
}
public int getLostNodes() {
return this.lostNodes;

View File

@ -94,6 +94,10 @@ public class NodeInfo {
public String getNodeHTTPAddress() {
return this.nodeHTTPAddress;
}
public void setNodeHTTPAddress(String nodeHTTPAddress) {
this.nodeHTTPAddress = nodeHTTPAddress;
}
public String getHealthStatus() {
return this.healthStatus;

View File

@ -81,13 +81,20 @@ public class MockNM {
}
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
b, ++responseId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
return nodeHeartbeat(conts, isHealthy, ++responseId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
status.setContainersStatuses(entry.getValue());
@ -97,7 +104,6 @@ public class MockNM {
healthStatus.setIsNodeHealthy(isHealthy);
healthStatus.setLastHealthReportTime(1);
status.setNodeHealthStatus(healthStatus);
status.setResponseId(++responseId);
req.setNodeStatus(status);
return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
}

View File

@ -56,6 +56,17 @@ public class MockNodes {
}
return list;
}
public static List<RMNode> lostNodes(int racks, int nodesPerRack,
Resource perNode) {
List<RMNode> list = Lists.newArrayList();
for (int i = 0; i < racks; ++i) {
for (int j = 0; j < nodesPerRack; ++j) {
list.add(lostNodeInfo(i, perNode, RMNodeState.LOST));
}
}
return list;
}
public static NodeId newNodeID(String host, int port) {
NodeId nid = recordFactory.newRecordInstance(NodeId.class);
@ -82,92 +93,120 @@ public class MockNodes {
return rs;
}
public static RMNode newNodeInfo(int rack, final Resource perNode) {
private static class MockRMNodeImpl implements RMNode {
private NodeId nodeId;
private String hostName;
private String nodeAddr;
private String httpAddress;
private int cmdPort;
private Resource perNode;
private String rackName;
private NodeHealthStatus nodeHealthStatus;
private RMNodeState state;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, NodeHealthStatus nodeHealthStatus,
int cmdPort, String hostName, RMNodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
this.perNode = perNode;
this.rackName = rackName;
this.nodeHealthStatus = nodeHealthStatus;
this.cmdPort = cmdPort;
this.hostName = hostName;
this.state = state;
}
@Override
public NodeId getNodeID() {
return this.nodeId;
}
@Override
public String getHostName() {
return this.hostName;
}
@Override
public int getCommandPort() {
return this.cmdPort;
}
@Override
public int getHttpPort() {
return 0;
}
@Override
public String getNodeAddress() {
return this.nodeAddr;
}
@Override
public String getHttpAddress() {
return this.httpAddress;
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
return this.nodeHealthStatus;
}
@Override
public Resource getTotalCapability() {
return this.perNode;
}
@Override
public String getRackName() {
return this.rackName;
}
@Override
public Node getNode() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public RMNodeState getState() {
return this.state;
}
@Override
public List<ContainerId> getContainersToCleanUp() {
return null;
}
@Override
public List<ApplicationId> getAppsToCleanup() {
return null;
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
return null;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
final String rackName = "rack"+ rack;
final int nid = NODE_ID++;
final String hostName = "host"+ nid;
final int port = 123;
final NodeId nodeID = newNodeID(hostName, port);
final String httpAddress = "localhost:0";
final String httpAddress = httpAddr;
final NodeHealthStatus nodeHealthStatus =
recordFactory.newRecordInstance(NodeHealthStatus.class);
final Resource used = newUsedResource(perNode);
final Resource avail = newAvailResource(perNode, used);
return new RMNode() {
@Override
public NodeId getNodeID() {
return nodeID;
}
return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName,
nodeHealthStatus, nid, hostName, state);
}
@Override
public String getNodeAddress() {
return hostName;
}
public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) {
return buildRMNode(rack, perNode, state, "N/A");
}
@Override
public String getHttpAddress() {
return httpAddress;
}
@Override
public Resource getTotalCapability() {
return perNode;
}
@Override
public String getRackName() {
return rackName;
}
@Override
public Node getNode() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
return nodeHealthStatus;
}
@Override
public int getCommandPort() {
return nid;
}
@Override
public int getHttpPort() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String getHostName() {
return hostName;
}
@Override
public RMNodeState getState() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<ApplicationId> getAppsToCleanup() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<ContainerId> getContainersToCleanUp() {
// TODO Auto-generated method stub
return null;
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
// TODO Auto-generated method stub
return null;
}
};
public static RMNode newNodeInfo(int rack, final Resource perNode) {
return buildRMNode(rack, perNode, null, "localhost:0");
}
}

View File

@ -130,6 +130,12 @@ public class MockRM extends ResourceManager {
nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
}
public void sendNodeLost(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
}
public void NMwaitForState(NodeId nodeid, RMNodeState finalState)
throws Exception {

View File

@ -31,6 +31,7 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
@ -100,8 +101,8 @@ public class TestRMNodeTransitions {
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null);
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
}

View File

@ -157,14 +157,14 @@ public class TestResourceTrackerService {
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService());
MockNM nm2 = rm.registerNode("host2:1234", 2048);
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true);
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
checkRebootedNMCount(rm, ++initialMetricCount);
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -36,39 +37,65 @@ import com.google.inject.Module;
* data for all the columns in the table as specified in the header.
*/
public class TestNodesPage {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
@Test
public void testNodesBlockRender() throws Exception {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
Injector injector = WebAppTests.createMockInjector(RMContext.class,
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB),
new Module() {
private Injector injector;
@Before
public void setUp() throws Exception {
injector = WebAppTests.createMockInjector(RMContext.class, TestRMWebApp
.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB), new Module() {
@Override
public void configure(Binder binder) {
try {
binder.bind(ResourceManager.class).toInstance(TestRMWebApp.mockRm(3,
numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB));
binder.bind(ResourceManager.class).toInstance(
TestRMWebApp.mockRm(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testNodesBlockRender() throws Exception {
injector.getInstance(NodesBlock.class).render();
PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector);
Mockito.verify(writer, Mockito.times(numberOfActualTableHeaders +
numberOfThInMetricsTable)).print(
"<th");
Mockito.verify(writer,
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
.print("<th");
Mockito.verify(
writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print("<td");
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
}
@Test
public void testNodesBlockRenderForLostNodes() {
NodesBlock nodesBlock = injector.getInstance(NodesBlock.class);
nodesBlock.set("node.state", "lost");
nodesBlock.render();
PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector);
Mockito.verify(writer,
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
.print("<th");
Mockito.verify(
writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
}
}

View File

@ -120,12 +120,23 @@ public class TestRMWebApp {
for (RMNode node : nodes) {
nodesMap.put(node.getNodeID(), node);
}
final List<RMNode> lostNodes = MockNodes.lostNodes(racks, numNodes,
newResource(mbsPerNode));
final ConcurrentMap<String, RMNode> lostNodesMap = Maps.newConcurrentMap();
for (RMNode node : lostNodes) {
lostNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(new MemStore(), null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return applicationsMaps;
}
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return lostNodesMap;
}
@Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return nodesMap;
}

View File

@ -370,7 +370,8 @@ public class TestRMWebServices extends JerseyTest {
WebServicesTestUtils.getXmlInt(element, "lostNodes"),
WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"),
WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"),
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"));
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"),
WebServicesTestUtils.getXmlInt(element, "activeNodes"));
}
}
@ -378,7 +379,7 @@ public class TestRMWebServices extends JerseyTest {
Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 11, clusterinfo.length());
assertEquals("incorrect number of elements", 12, clusterinfo.length());
verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
clusterinfo.getInt("allocatedMB"),
@ -386,13 +387,13 @@ public class TestRMWebServices extends JerseyTest {
clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"),
clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"),
clusterinfo.getInt("decommissionedNodes"),
clusterinfo.getInt("rebootedNodes"));
clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes"));
}
public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
int allocMB, int containersAlloc, int totalMB, int totalNodes,
int lostNodes, int unhealthyNodes, int decommissionedNodes,
int rebootedNodes) throws JSONException, Exception {
int rebootedNodes, int activeNodes) throws JSONException, Exception {
ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics();
@ -412,8 +413,11 @@ public class TestRMWebServices extends JerseyTest {
* MB_IN_GB, allocMB);
assertEquals("containersAllocated doesn't match", 0, containersAlloc);
assertEquals("totalMB doesn't match", totalMBExpect, totalMB);
assertEquals("totalNodes doesn't match", clusterMetrics.getNumNMs(),
totalNodes);
assertEquals(
"totalNodes doesn't match",
clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs()
+ clusterMetrics.getNumDecommisionedNMs()
+ clusterMetrics.getNumRebootedNMs(), totalNodes);
assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(),
lostNodes);
assertEquals("unhealthyNodes doesn't match",
@ -422,6 +426,8 @@ public class TestRMWebServices extends JerseyTest {
clusterMetrics.getNumDecommisionedNMs(), decommissionedNodes);
assertEquals("rebootedNodes doesn't match",
clusterMetrics.getNumRebootedNMs(), rebootedNodes);
assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(),
activeNodes);
}
@Test

View File

@ -202,6 +202,69 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.stop();
}
}
@Test
public void testNodesQueryStateLost() throws JSONException, Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1234", 5120);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm1);
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", RMNodeState.LOST.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
JSONObject nodes = json.getJSONObject("nodes");
assertEquals("incorrect number of elements", 1, nodes.length());
JSONArray nodeArray = nodes.getJSONArray("node");
assertEquals("incorrect number of elements", 2, nodeArray.length());
for (int i = 0; i < nodeArray.length(); ++i) {
JSONObject info = nodeArray.getJSONObject(i);
String host = info.get("id").toString().split(":")[0];
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
.toString(), info.getString("state"));
}
}
@Test
public void testSingleNodeQueryStateLost() throws JSONException, Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1234", 5120);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm1);
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").path("h2:1234").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
JSONObject info = json.getJSONObject("node");
String id = info.get("id").toString();
assertEquals("Incorrect Node Information.", "h2:1234", id);
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2");
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
@Test
public void testNodesQueryHealthy() throws JSONException, Exception {

View File

@ -0,0 +1,49 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
YARN
---
---
${maven.build.timestamp}
Web Application Proxy
The Web Application Proxy is part of YARN. By default it will run as part of
the Resource Manager(RM), but can be configured to run in stand alone mode.
The reason for the proxy is to reduce the possibility of web based attacks
through YARN.
In YARN the Application Master(AM) has the responsibility to provide a web UI
and to send that link to the RM. This opens up a number of potential
issues. The RM runs as a trusted user, and people visiting that web
address will treat it, and links it provides to them as trusted, when in
reality the AM is running as a non-trusted user, and the links it gives to
the RM could point to anything malicious or otherwise. The Web Application
Proxy mitigates this risk by warning users that do not own the given
application that they are connecting to an untrusted site.
In addition to this the proxy also tries to reduce the impact that a malicious
AM could have on a user. It primarily does this by stripping out cookies from
the user, and replacing them with a single cookie providing the user name of
the logged in user. This is because most web based authentication systems will
identify a user based off of a cookie. By providing this cookie to an
untrusted application it opens up the potential for an exploit. If the cookie
is designed properly that potential should be fairly minimal, but this is just
to reduce that potential attack vector. The current proxy implementation does
nothing to prevent the AM from providing links to malicious external sites,
nor does it do anything to prevent malicious javascript code from running as
well. In fact javascript can be used to get the cookies, so stripping the
cookies from the request has minimal benefit at this time.
In the future we hope to address the attack vectors described above and make
attaching to an AM's web UI safer.

View File

@ -47,4 +47,6 @@ MapReduce NextGen aka YARN aka MRv2
* {{{./CapacityScheduler.html}Capacity Scheduler}}
* {{{./WebApplicationProxy.html}Web Application Proxy}}

View File

@ -223,6 +223,11 @@
<artifactId>hadoop-archives</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-rumen</artifactId>

View File

@ -61,6 +61,7 @@
<item name="YARN Architecture" href="hadoop-yarn/hadoop-yarn-site/YARN.html"/>
<item name="Writing Yarn Applications" href="hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"/>
<item name="Capacity Scheduler" href="hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html"/>
<item name="Web Application Proxy" href="hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html"/>
</menu>
<menu name="YARN REST API's" inherit="top">

View File

@ -20,7 +20,7 @@
<version>0.24.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop.tools</groupId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>0.24.0-SNAPSHOT</version>
<description>Apache Hadoop Distributed Copy</description>

View File

@ -38,6 +38,11 @@
<artifactId>hadoop-streaming</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>