Merge r1517887 through r1518850 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1519796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-03 19:01:19 +00:00
commit 336c0344f5
47 changed files with 997 additions and 164 deletions

View File

@ -327,6 +327,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9487 Deprecation warnings in Configuration should go to their
own log or otherwise be suppressible (Chu Tong via stevel)
HADOOP-9889. Refresh the Krb5 configuration when creating a new kdc in
Hadoop-MiniKDC (Wei Yan via Sandy Ryza)
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
@ -362,6 +365,9 @@ Release 2.1.1-beta - UNRELEASED
IMPROVEMENTS
HADOOP-9910. proxy server start and stop documentation wrong
(Andre Kelpe via harsh)
HADOOP-9446. Support Kerberos SPNEGO for IBM JDK. (Yu Gao via llu)
HADOOP-9787. ShutdownHelper util to shutdown threads and threadpools.
@ -387,6 +393,9 @@ Release 2.1.1-beta - UNRELEASED
HADOOP-9886. Turn warning message in RetryInvocationHandler to debug (arpit)
HADOOP-9906. Move HAZKUtil to o.a.h.util.ZKUtil and make inner-classes
public (Karthik Kambatla via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
@ -423,6 +432,9 @@ Release 2.1.1-beta - UNRELEASED
HADOOP-9887. globStatus does not correctly handle paths starting with a drive
spec on Windows. (Chuan Liu via cnauroth)
HADOOP-9894. Race condition in Shell leads to logged error stream handling
exceptions (Arpit Agarwal)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException;
@ -47,7 +47,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
*

View File

@ -36,7 +36,8 @@
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
@ -313,18 +314,18 @@ private void initZK() throws HadoopIllegalArgumentException, IOException,
ZK_SESSION_TIMEOUT_DEFAULT);
// Parse ACLs from configuration.
String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf);
List<ACL> zkAcls = HAZKUtil.parseACLs(zkAclConf);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
if (zkAcls.isEmpty()) {
zkAcls = Ids.CREATOR_ALL_ACL;
}
// Parse authentication from configuration.
String zkAuthConf = conf.get(ZK_AUTH_KEY);
zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf);
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
List<ZKAuthInfo> zkAuths;
if (zkAuthConf != null) {
zkAuths = HAZKUtil.parseAuth(zkAuthConf);
zkAuths = ZKUtil.parseAuth(zkAuthConf);
} else {
zkAuths = Collections.emptyList();
}

View File

@ -515,8 +515,13 @@ public void run() {
} catch (IOException ioe) {
LOG.warn("Error while closing the input stream", ioe);
}
if (!completed.get()) {
errThread.interrupt();
try {
if (!completed.get()) {
errThread.interrupt();
errThread.join();
}
} catch (InterruptedException ie) {
LOG.warn("Interrupted while joining errThread");
}
try {
errReader.close();

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
package org.apache.hadoop.util;
import java.io.File;
import java.io.IOException;
@ -36,7 +36,7 @@
* Utilities for working with ZooKeeper.
*/
@InterfaceAudience.Private
public class HAZKUtil {
public class ZKUtil {
/**
* Parse ACL permission string, partially borrowed from
@ -76,9 +76,10 @@ private static int getPermFromString(String permString) {
* <code>sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa</code>
*
* @return ACL list
* @throws HadoopIllegalArgumentException if an ACL is invalid
* @throws {@link BadAclFormatException} if an ACL is invalid
*/
public static List<ACL> parseACLs(String aclString) {
public static List<ACL> parseACLs(String aclString) throws
BadAclFormatException {
List<ACL> acl = Lists.newArrayList();
if (aclString == null) {
return acl;
@ -113,8 +114,10 @@ public static List<ACL> parseACLs(String aclString) {
*
* @param authString the comma-separated auth mechanisms
* @return a list of parsed authentications
* @throws {@link BadAuthFormatException} if the auth format is invalid
*/
public static List<ZKAuthInfo> parseAuth(String authString) {
public static List<ZKAuthInfo> parseAuth(String authString) throws
BadAuthFormatException{
List<ZKAuthInfo> ret = Lists.newArrayList();
if (authString == null) {
return ret;
@ -161,7 +164,8 @@ public static String resolveConfIndirection(String valInConf)
/**
* An authentication token passed to ZooKeeper.addAuthInfo
*/
static class ZKAuthInfo {
@InterfaceAudience.Private
public static class ZKAuthInfo {
private final String scheme;
private final byte[] auth;
@ -171,29 +175,32 @@ public ZKAuthInfo(String scheme, byte[] auth) {
this.auth = auth;
}
String getScheme() {
public String getScheme() {
return scheme;
}
byte[] getAuth() {
public byte[] getAuth() {
return auth;
}
}
static class BadAclFormatException extends HadoopIllegalArgumentException {
@InterfaceAudience.Private
public static class BadAclFormatException extends
HadoopIllegalArgumentException {
private static final long serialVersionUID = 1L;
public BadAclFormatException(String message) {
super(message);
}
}
static class BadAuthFormatException extends HadoopIllegalArgumentException {
@InterfaceAudience.Private
public static class BadAuthFormatException extends
HadoopIllegalArgumentException {
private static final long serialVersionUID = 1L;
public BadAuthFormatException(String message) {
super(message);
}
}
}

View File

@ -518,7 +518,7 @@ $ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start nodemana
are used with load balancing it should be run on each of them:
----
$ $HADOOP_YARN_HOME/bin/yarn start proxyserver --config $HADOOP_CONF_DIR
$ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh start proxyserver --config $HADOOP_CONF_DIR
----
Start the MapReduce JobHistory Server with the following command, run on the
@ -560,7 +560,7 @@ $ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop nodemanag
balancing it should be run on each of them:
----
$ $HADOOP_YARN_HOME/bin/yarn stop proxyserver --config $HADOOP_CONF_DIR
$ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh stop proxyserver --config $HADOOP_CONF_DIR
----

View File

@ -41,7 +41,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.hadoop.test.GenericTestUtils;
public class TestActiveStandbyElector {

View File

@ -28,7 +28,7 @@
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.State;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Level;
import org.apache.zookeeper.ZooDefs.Ids;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
package org.apache.hadoop.util;
import static org.junit.Assert.*;
@ -24,8 +24,9 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.ha.HAZKUtil.BadAclFormatException;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.ZKUtil.BadAclFormatException;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.junit.Test;
@ -33,9 +34,9 @@
import com.google.common.base.Charsets;
import com.google.common.io.Files;
public class TestHAZKUtil {
public class TestZKUtil {
private static final String TEST_ROOT_DIR = System.getProperty(
"test.build.data", "/tmp") + "/TestHAZKUtil";
"test.build.data", "/tmp") + "/TestZKUtil";
private static final File TEST_FILE = new File(TEST_ROOT_DIR,
"test-file");
@ -45,13 +46,13 @@ public class TestHAZKUtil {
@Test
public void testEmptyACL() {
List<ACL> result = HAZKUtil.parseACLs("");
List<ACL> result = ZKUtil.parseACLs("");
assertTrue(result.isEmpty());
}
@Test
public void testNullACL() {
List<ACL> result = HAZKUtil.parseACLs(null);
List<ACL> result = ZKUtil.parseACLs(null);
assertTrue(result.isEmpty());
}
@ -67,7 +68,7 @@ public void testInvalidACLs() {
private static void badAcl(String acls, String expectedErr) {
try {
HAZKUtil.parseACLs(acls);
ZKUtil.parseACLs(acls);
fail("Should have failed to parse '" + acls + "'");
} catch (BadAclFormatException e) {
assertEquals(expectedErr, e.getMessage());
@ -76,7 +77,7 @@ private static void badAcl(String acls, String expectedErr) {
@Test
public void testGoodACLs() {
List<ACL> result = HAZKUtil.parseACLs(
List<ACL> result = ZKUtil.parseACLs(
"sasl:hdfs/host1@MY.DOMAIN:cdrwa, sasl:hdfs/host2@MY.DOMAIN:ca");
ACL acl0 = result.get(0);
assertEquals(Perms.CREATE | Perms.DELETE | Perms.READ |
@ -92,19 +93,19 @@ public void testGoodACLs() {
@Test
public void testEmptyAuth() {
List<ZKAuthInfo> result = HAZKUtil.parseAuth("");
List<ZKAuthInfo> result = ZKUtil.parseAuth("");
assertTrue(result.isEmpty());
}
@Test
public void testNullAuth() {
List<ZKAuthInfo> result = HAZKUtil.parseAuth(null);
List<ZKAuthInfo> result = ZKUtil.parseAuth(null);
assertTrue(result.isEmpty());
}
@Test
public void testGoodAuths() {
List<ZKAuthInfo> result = HAZKUtil.parseAuth(
List<ZKAuthInfo> result = ZKUtil.parseAuth(
"scheme:data,\n scheme2:user:pass");
assertEquals(2, result.size());
ZKAuthInfo auth0 = result.get(0);
@ -118,16 +119,16 @@ public void testGoodAuths() {
@Test
public void testConfIndirection() throws IOException {
assertNull(HAZKUtil.resolveConfIndirection(null));
assertEquals("x", HAZKUtil.resolveConfIndirection("x"));
assertNull(ZKUtil.resolveConfIndirection(null));
assertEquals("x", ZKUtil.resolveConfIndirection("x"));
TEST_FILE.getParentFile().mkdirs();
Files.write("hello world", TEST_FILE, Charsets.UTF_8);
assertEquals("hello world", HAZKUtil.resolveConfIndirection(
assertEquals("hello world", ZKUtil.resolveConfIndirection(
"@" + TEST_FILE.getAbsolutePath()));
try {
HAZKUtil.resolveConfIndirection("@" + BOGUS_FILE);
ZKUtil.resolveConfIndirection("@" + BOGUS_FILE);
fail("Did not throw for non-existent file reference");
} catch (FileNotFoundException fnfe) {
assertTrue(fnfe.getMessage().startsWith(BOGUS_FILE));

View File

@ -59,6 +59,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.text.MessageFormat;
@ -432,6 +433,17 @@ private void initKDCServer() throws Exception {
System.setProperty("sun.security.krb5.debug", conf.getProperty(DEBUG,
"false"));
// refresh the config
Class<?> classRef;
if (System.getProperty("java.vendor").contains("IBM")) {
classRef = Class.forName("com.ibm.security.krb5.internal.Config");
} else {
classRef = Class.forName("sun.security.krb5.Config");
}
Method refreshMethod = classRef.getMethod("refresh", new Class[0]);
refreshMethod.invoke(classRef, new Object[0]);
LOG.info("MiniKdc listening at port: {}", getPort());
LOG.info("MiniKdc setting JVM krb5.conf to: {}",
krb5conf.getAbsolutePath());

View File

@ -30,7 +30,11 @@
import javax.security.auth.login.LoginContext;
import java.io.File;
import java.security.Principal;
import java.util.*;
import java.util.Set;
import java.util.Map;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Arrays;
public class TestMiniKdc extends KerberosSecurityTestcase {

View File

@ -201,4 +201,8 @@ public static WriteStableHow fromValue(int id) {
public static final String EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY = "hdfs.nfs.exports.cache.expirytime.millis";
public static final long EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 15 * 60 * 1000; // 15 min
public static final String FILE_DUMP_DIR_KEY = "dfs.nfs3.dump.dir";
public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs";
public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump";
public static final boolean ENABLE_FILE_DUMP_DEFAULT = true;
}

View File

@ -126,6 +126,9 @@ private long updateNonSequentialWriteInMemory(long count) {
nonSequentialWriteInMemory = 0;
this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true;
nextOffset = latestAttr.getSize();
assert(nextOffset == this.fos.getPos());
ctxLock = new ReentrantLock(true);
}
@ -685,12 +688,14 @@ private void doSingleWrite(final WriteCtx writeCtx) {
try {
fos.write(data, 0, count);
if (fos.getPos() != (offset + count)) {
long flushedOffset = getFlushedOffset();
if (flushedOffset != (offset + count)) {
throw new IOException("output stream is out of sync, pos="
+ fos.getPos() + " and nextOffset should be" + (offset + count));
+ flushedOffset + " and nextOffset should be"
+ (offset + count));
}
nextOffset = fos.getPos();
nextOffset = flushedOffset;
// Reduce memory occupation size if request was allowed dumped
if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options;
@ -123,7 +124,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
private final Configuration config = new Configuration();
private final WriteManager writeManager;
private final IdUserGroup iug;// = new IdUserGroup();
private final IdUserGroup iug;
private final DFSClientCache clientCache;
private final NfsExports exports;
@ -161,10 +162,14 @@ public RpcProgramNfs3(Configuration config)
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
bufferSize = config.getInt("io.file.buffer.size", 4096);
bufferSize = config.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
writeDumpDir = config.get("dfs.nfs3.dump.dir", "/tmp/.hdfs-nfs");
boolean enableDump = config.getBoolean("dfs.nfs3.enableDump", true);
writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY,
Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT);
if (!enableDump) {
writeDumpDir = null;
} else {
@ -1112,6 +1117,7 @@ public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) {
}
}
@Override
public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);

View File

@ -25,7 +25,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@ -48,6 +50,7 @@
public class WriteManager {
public static final Log LOG = LogFactory.getLog(WriteManager.class);
private final Configuration config;
private final IdUserGroup iug;
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
.newConcurrentMap();
@ -76,6 +79,7 @@ void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
WriteManager(IdUserGroup iug, final Configuration config) {
this.iug = iug;
this.config = config;
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
DEFAULT_STREAM_TIMEOUT);
@ -129,12 +133,41 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
return;
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
HdfsDataOutputStream fos = null;
Nfs3FileAttributes latestAttr = null;
try {
int bufferSize = config.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
fos = dfsClient.append(fileIdPath, bufferSize, null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
} catch (IOException e) {
LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
if (fos != null) {
fos.close();
}
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
preOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
return;
}
// Add open stream
String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+ fileHandle.getFileId());
addOpenFileStream(fileHandle, openFileCtx);
if (LOG.isDebugEnabled()) {
LOG.debug("opened stream for file:" + fileHandle.getFileId());
}
}
// Add write into the async job queue

View File

@ -307,6 +307,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-4947 Add NFS server export table to control export by hostname or
IP range (Jing Zhao via brandonli)
HDFS-5078 Support file append in NFSv3 gateway to enable data streaming
to HDFS (brandonli)
IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
@ -333,6 +336,12 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5045. Add more unit tests for retry cache to cover all AtMostOnce
methods. (jing9)
HDFS-3245. Add metrics and web UI for cluster version summary. (Ravi
Prakash via kihwal)
HDFS-5128. Allow multiple net interfaces to be used with HA namenode RPC
server. (kihwal)
OPTIMIZATIONS
BUG FIXES
@ -389,6 +398,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5124. DelegationTokenSecretManager#retrievePassword can cause deadlock
in NameNode. (Daryn Sharp via jing9)
HDFS-5132. Deadlock in NameNode between SafeModeMonitor#run and
DatanodeManager#handleHeartbeat. (kihwal)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -104,7 +104,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host";
public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address";
public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host";
public static final String DFS_NAMENODE_MAX_OBJECTS_KEY = "dfs.namenode.max.objects";
public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0;
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_KEY = "dfs.namenode.safemode.extension";

View File

@ -47,6 +47,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
private long lastUpdate;
private int xceiverCount;
private String location = NetworkTopology.DEFAULT_RACK;
private String softwareVersion;
// Datanode administrative states
public enum AdminStates {
@ -383,4 +384,12 @@ public boolean equals(Object obj) {
// by DatanodeID
return (this == obj) || super.equals(obj);
}
public String getSoftwareVersion() {
return softwareVersion;
}
public void setSoftwareVersion(String softwareVersion) {
this.softwareVersion = softwareVersion;
}
}

View File

@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
@ -165,6 +166,14 @@ public class DatanodeManager {
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
/**
* The number of datanodes for each software version. This list should change
* during rolling upgrades.
* Software version -> Number of datanodes with this version
*/
private HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<String, Integer>(4, 0.75f);
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
@ -470,6 +479,7 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) {
heartbeatManager.removeDatanode(nodeInfo);
blockManager.removeBlocksAssociatedTo(nodeInfo);
networktopology.remove(nodeInfo);
decrementVersionCount(nodeInfo.getSoftwareVersion());
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo);
@ -552,6 +562,61 @@ private void wipeDatanode(final DatanodeID node) {
}
}
private void incrementVersionCount(String version) {
if (version == null) {
return;
}
synchronized(datanodeMap) {
Integer count = this.datanodesSoftwareVersions.get(version);
count = count == null ? 1 : count + 1;
this.datanodesSoftwareVersions.put(version, count);
}
}
private void decrementVersionCount(String version) {
if (version == null) {
return;
}
synchronized(datanodeMap) {
Integer count = this.datanodesSoftwareVersions.get(version);
if(count != null) {
if(count > 1) {
this.datanodesSoftwareVersions.put(version, count-1);
} else {
this.datanodesSoftwareVersions.remove(version);
}
}
}
}
private boolean shouldCountVersion(DatanodeDescriptor node) {
return node.getSoftwareVersion() != null && node.isAlive &&
!isDatanodeDead(node);
}
private void countSoftwareVersions() {
synchronized(datanodeMap) {
HashMap<String, Integer> versionCount = new HashMap<String, Integer>();
for(DatanodeDescriptor dn: datanodeMap.values()) {
// Check isAlive too because right after removeDatanode(),
// isDatanodeDead() is still true
if(shouldCountVersion(dn))
{
Integer num = versionCount.get(dn.getSoftwareVersion());
num = num == null ? 1 : num+1;
versionCount.put(dn.getSoftwareVersion(), num);
}
}
this.datanodesSoftwareVersions = versionCount;
}
}
public HashMap<String, Integer> getDatanodesSoftwareVersions() {
synchronized(datanodeMap) {
return new HashMap<String, Integer> (this.datanodesSoftwareVersions);
}
}
/* Resolve a node's network location */
private String resolveNetworkLocation (DatanodeID node) {
List<String> names = new ArrayList<String>(1);
@ -751,21 +816,28 @@ nodes with its data cleared (or user can just remove the StorageID
try {
// update cluster map
getNetworkTopology().remove(nodeS);
if(shouldCountVersion(nodeS)) {
decrementVersionCount(nodeS.getSoftwareVersion());
}
nodeS.updateRegInfo(nodeReg);
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
checkDecommissioning(nodeS);
success = true;
} finally {
if (!success) {
removeDatanode(nodeS);
wipeDatanode(nodeS);
countSoftwareVersions();
}
}
return;
@ -789,6 +861,7 @@ nodes with its data cleared (or user can just remove the StorageID
try {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
// register new datanode
addDatanode(nodeDescr);
@ -799,10 +872,12 @@ nodes with its data cleared (or user can just remove the StorageID
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
success = true;
incrementVersionCount(nodeReg.getSoftwareVersion());
} finally {
if (!success) {
removeDatanode(nodeDescr);
wipeDatanode(nodeDescr);
countSoftwareVersions();
}
}
} catch (InvalidTopologyException e) {
@ -824,6 +899,7 @@ public void refreshNodes(final Configuration conf) throws IOException {
namesystem.writeLock();
try {
refreshDatanodes();
countSoftwareVersions();
} finally {
namesystem.writeUnlock();
}

View File

@ -359,6 +359,7 @@ public NamenodeStatus getNamenodeStatus(String props) throws IOException,
nn.httpAddress = httpAddress;
getLiveNodeCount(getProperty(props, "LiveNodes").getValueAsText(), nn);
getDeadNodeCount(getProperty(props, "DeadNodes").getValueAsText(), nn);
nn.softwareVersion = getProperty(props, "SoftwareVersion").getTextValue();
return nn;
}
@ -596,6 +597,7 @@ public void toXML(XMLOutputter doc) throws IOException {
toXmlItemBlockWithLink(doc, nn.deadDatanodeCount + " (" +
nn.deadDecomCount + ")", nn.httpAddress+"/dfsnodelist.jsp?whatNodes=DEAD"
, "Dead Datanode (Decommissioned)");
toXmlItemBlock(doc, "Software Version", nn.softwareVersion);
doc.endTag(); // node
}
doc.endTag(); // namenodes
@ -624,6 +626,7 @@ static class NamenodeStatus {
int deadDatanodeCount = 0;
int deadDecomCount = 0;
String httpAddress = null;
String softwareVersion = "";
}
/**

View File

@ -4804,7 +4804,21 @@ class SafeModeMonitor implements Runnable {
*/
@Override
public void run() {
while (fsRunning && (safeMode != null && !safeMode.canLeave())) {
while (fsRunning) {
writeLock();
try {
if (safeMode == null) { // Not in safe mode.
break;
}
if (safeMode.canLeave()) {
// Leave safe mode.
safeMode.leave();
break;
}
} finally {
writeUnlock();
}
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
@ -4813,9 +4827,6 @@ public void run() {
}
if (!fsRunning) {
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
} else {
// leave safe mode and stop the monitor
leaveSafeMode();
}
smmthread = null;
}
@ -6226,6 +6237,7 @@ public String getLiveNodes() {
innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed());
innerinfo.put("capacity", node.getCapacity());
innerinfo.put("numBlocks", node.numBlocks());
innerinfo.put("version", node.getSoftwareVersion());
info.put(node.getHostName(), innerinfo);
}
return JSON.toString(info);
@ -6437,6 +6449,22 @@ public String getCorruptFiles() {
return JSON.toString(list);
}
@Override //NameNodeMXBean
public int getDistinctVersionCount() {
return blockManager.getDatanodeManager().getDatanodesSoftwareVersions()
.size();
}
@Override //NameNodeMXBean
public Map<String, Integer> getDistinctVersions() {
return blockManager.getDatanodeManager().getDatanodesSoftwareVersions();
}
@Override //NameNodeMXBean
public String getSoftwareVersion() {
return VersionInfo.getVersion();
}
/**
* Verifies that the given identifier and password are valid and match.
* @param identifier Token identifier.

View File

@ -166,12 +166,14 @@ public static enum OperationCategory {
*/
public static final String[] NAMENODE_SPECIFIC_KEYS = {
DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_NAME_DIR_KEY,
DFS_NAMENODE_EDITS_DIR_KEY,
DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HTTP_ADDRESS_KEY,
DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
@ -387,6 +389,28 @@ protected InetSocketAddress getRpcServerAddress(Configuration conf) {
return getAddress(conf);
}
/** Given a configuration get the bind host of the service rpc server
* If the bind host is not configured returns null.
*/
protected String getServiceRpcServerBindHost(Configuration conf) {
String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
if (addr == null || addr.isEmpty()) {
return null;
}
return addr;
}
/** Given a configuration get the bind host of the client rpc server
* If the bind host is not configured returns null.
*/
protected String getRpcServerBindHost(Configuration conf) {
String addr = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY);
if (addr == null || addr.isEmpty()) {
return null;
}
return addr;
}
/**
* Modifies the configuration passed to contain the service rpc address setting
*/

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -33,7 +35,13 @@ public interface NameNodeMXBean {
* @return the version
*/
public String getVersion();
/**
* Get the version of software running on the Namenode
* @return a string representing the version
*/
public String getSoftwareVersion();
/**
* Gets the used space by data nodes.
*
@ -215,4 +223,19 @@ public interface NameNodeMXBean {
* @return the list of corrupt files, as a JSON string.
*/
public String getCorruptFiles();
/**
* Get the number of distinct versions of live datanodes
*
* @return the number of distinct versions of live datanodes
*/
public int getDistinctVersionCount();
/**
* Get the number of live datanodes for each distinct versions
*
* @return the number of live datanodes for each distinct versions
*/
public Map<String, Integer> getDistinctVersions();
}

View File

@ -137,6 +137,7 @@
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
/**
@ -219,6 +220,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {
String bindHost = nn.getServiceRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = serviceRpcAddr.getHostName();
}
LOG.info("Service RPC server is binding to " + bindHost + ":" +
serviceRpcAddr.getPort());
int serviceHandlerCount =
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
@ -226,7 +234,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(serviceRpcAddr.getHostName())
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
@ -247,7 +255,10 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer);
serviceRPCAddress = serviceRpcServer.getListenerAddress();
// Update the address with the correct port
InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
serviceRPCAddress = new InetSocketAddress(
serviceRpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
} else {
serviceRpcServer = null;
@ -255,11 +266,17 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
}
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = rpcAddr.getHostName();
}
LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());
clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(rpcAddr.getHostName())
.setBindAddress(bindHost)
.setPort(rpcAddr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
@ -291,7 +308,9 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
}
// The rpc-server port can be ephemeral... ensure we have the correct info
clientRpcAddress = clientRpcServer.getListenerAddress();
InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
clientRpcAddress = new InetSocketAddress(
rpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServerAddress(conf, clientRpcAddress);
minimumDataNodeVersion = conf.get(
@ -315,6 +334,12 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
/** Allow access to the client RPC server for testing */
@VisibleForTesting
RPC.Server getClientRpcServer() {
return clientRpcServer;
}
/**
* Start client and service RPC servers.

View File

@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@ -100,6 +101,20 @@ static String getSecurityModeText() {
}
}
static String getRollingUpgradeText(FSNamesystem fsn) {
DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
Map<String, Integer> list = dm.getDatanodesSoftwareVersions();
if(list.size() > 1) {
StringBuffer status = new StringBuffer("Rolling upgrades in progress. " +
"There are " + list.size() + " versions of datanodes currently live: ");
for(Map.Entry<String, Integer> ver: list.entrySet()) {
status.append(ver.getKey() + "(" + ver.getValue() + "), ");
}
return status.substring(0, status.length()-2);
}
return "";
}
static String getInodeLimitText(FSNamesystem fsn) {
if (fsn == null) {
return "";
@ -803,7 +818,9 @@ void generateNodeData(JspWriter out, DatanodeDescriptor d, String suffix,
+ "<td align=\"right\" class=\"pcbpused\">"
+ percentBpUsed
+ "<td align=\"right\" class=\"volfails\">"
+ d.getVolumeFailures() + "\n");
+ d.getVolumeFailures()
+ "<td align=\"right\" class=\"version\">"
+ d.getSoftwareVersion() + "\n");
}
void generateNodesList(ServletContext context, JspWriter out,
@ -901,7 +918,9 @@ void generateNodesList(ServletContext context, JspWriter out,
+ nodeHeaderStr("pcbpused")
+ "> Block Pool<br>Used (%)" + " <th "
+ nodeHeaderStr("volfails")
+"> Failed Volumes\n");
+"> Failed Volumes <th "
+ nodeHeaderStr("versionString")
+"> Version\n");
JspHelper.sortNodeList(live, sorterField, sorterOrder);
for (int i = 0; i < live.size(); i++) {

View File

@ -51,6 +51,18 @@
</description>
</property>
<property>
<name>dfs.namenode.rpc-bind-host</name>
<value></value>
<description>
The actual address the server will bind to. If this optional address is
set, the RPC server will bind to this address and the port specified in
dfs.namenode.rpc-address for the RPC server. It can also be specified
per name node or name service for HA/Federation. This is most useful for
making name node listen to all interfaces by setting to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.servicerpc-address</name>
<value></value>
@ -64,6 +76,18 @@
</description>
</property>
<property>
<name>dfs.namenode.servicerpc-bind-host</name>
<value></value>
<description>
The actual address the server will bind to. If this optional address is
set, the service RPC server will bind to this address and the port
specified in dfs.namenode.servicerpc-address. It can also be specified
per name node or name service for HA/Federation. This is most useful for
making name node listen to all interfaces by setting to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>0.0.0.0:50090</value>

View File

@ -65,6 +65,7 @@
<h3>Cluster Summary</h3>
<b> <%= NamenodeJspHelper.getSecurityModeText()%> </b>
<b> <%= NamenodeJspHelper.getSafeModeText(fsn)%> </b>
<b> <%= NamenodeJspHelper.getRollingUpgradeText(fsn)%> </b>
<b> <%= NamenodeJspHelper.getInodeLimitText(fsn)%> </b>
<%= NamenodeJspHelper.getCorruptFilesWarning(fsn)%>

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.junit.Test;
import org.mockito.Mockito;
import org.mortbay.log.Log;
import static org.junit.Assert.*;
public class TestDatanodeManager {
//The number of times the registration / removal of nodes should happen
final int NUM_ITERATIONS = 500;
/**
* This test sends a random sequence of node registrations and node removals
* to the DatanodeManager (of nodes with different IDs and versions), and
* checks that the DatanodeManager keeps a correct count of different software
* versions at all times.
*/
@Test
public void testNumVersionsReportedCorrect() throws IOException {
//Create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, new Configuration());
//Seed the RNG with a known value so test failures are easier to reproduce
Random rng = new Random();
int seed = rng.nextInt();
rng = new Random(seed);
Log.info("Using seed " + seed + " for testing");
//A map of the Storage IDs to the DN registration it was registered with
HashMap <String, DatanodeRegistration> sIdToDnReg =
new HashMap<String, DatanodeRegistration>();
for(int i=0; i<NUM_ITERATIONS; ++i) {
//If true, remove a node for every 3rd time (if there's one)
if(rng.nextBoolean() && i%3 == 0 && sIdToDnReg.size()!=0) {
//Pick a random node.
int randomIndex = rng.nextInt() % sIdToDnReg.size();
//Iterate to that random position
Iterator<Map.Entry<String, DatanodeRegistration>> it =
sIdToDnReg.entrySet().iterator();
for(int j=0; j<randomIndex-1; ++j) {
it.next();
}
DatanodeRegistration toRemove = it.next().getValue();
Log.info("Removing node " + toRemove.getStorageID() + " ip " +
toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());
//Remove that random node
dm.removeDatanode(toRemove);
it.remove();
}
// Otherwise register a node. This node may be a new / an old one
else {
//Pick a random storageID to register.
String storageID = "someStorageID" + rng.nextInt(5000);
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getStorageID()).thenReturn(storageID);
//If this storageID had already been registered before
if(sIdToDnReg.containsKey(storageID)) {
dr = sIdToDnReg.get(storageID);
//Half of the times, change the IP address
if(rng.nextBoolean()) {
dr.setIpAddr(dr.getIpAddr() + "newIP");
}
} else { //This storageID has never been registered
//Ensure IP address is unique to storageID
String ip = "someIP" + storageID;
Mockito.when(dr.getIpAddr()).thenReturn(ip);
Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
Mockito.when(dr.getXferPort()).thenReturn(9000);
}
//Pick a random version to register with
Mockito.when(dr.getSoftwareVersion()).thenReturn(
"version" + rng.nextInt(5));
Log.info("Registering node storageID: " + dr.getStorageID() +
", version: " + dr.getSoftwareVersion() + ", IP address: "
+ dr.getXferAddr());
//Register this random node
dm.registerDatanode(dr);
sIdToDnReg.put(storageID, dr);
}
//Verify DatanodeManager still has the right count
Map<String, Integer> mapToCheck = dm.getDatanodesSoftwareVersions();
//Remove counts from versions and make sure that after removing all nodes
//mapToCheck is empty
for(Entry<String, DatanodeRegistration> it: sIdToDnReg.entrySet()) {
String ver = it.getValue().getSoftwareVersion();
if(!mapToCheck.containsKey(ver)) {
throw new AssertionError("The correct number of datanodes of a "
+ "version was not found on iteration " + i);
}
mapToCheck.put(ver, mapToCheck.get(ver) - 1);
if(mapToCheck.get(ver) == 0) {
mapToCheck.remove(ver);
}
}
for(Entry <String, Integer> entry: mapToCheck.entrySet()) {
Log.info("Still in map: " + entry.getKey() + " has "
+ entry.getValue());
}
assertEquals("The map of version counts returned by DatanodeManager was"
+ " not what it was expected to be on iteration " + i, 0,
mapToCheck.size());
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Test the MiniDFSCluster functionality that allows "dfs.datanode.address",
* "dfs.datanode.http.address", and "dfs.datanode.ipc.address" to be
* configurable. The MiniDFSCluster.startDataNodes() API now has a parameter
* that will check these properties if told to do so.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
public class TestNameNodeRpcServer {
@Test
public void testNamenodeRpcBindAny() throws IOException {
Configuration conf = new HdfsConfiguration();
// The name node in MiniDFSCluster only binds to 127.0.0.1.
// We can set the bind address to 0.0.0.0 to make it listen
// to all interfaces.
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY, "0.0.0.0");
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
assertEquals("0.0.0.0", ((NameNodeRpcServer)cluster.getNameNodeRpc())
.getClientRpcServer().getListenerAddress().getHostName());
} finally {
if (cluster != null) {
cluster.shutdown();
}
// Reset the config
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
}
}

View File

@ -243,6 +243,12 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory
only after unregistering from the RM. (Jian He via vinodkv)
MAPREDUCE-5483. revert MAPREDUCE-5357. (rkanter via tucu)
MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM
commands to reboot, so that client can continue to track the overall job.
(Jian He via vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -993,7 +993,7 @@ public JobStateInternal getInternalState() {
}
}
private static JobState getExternalState(JobStateInternal smState) {
private JobState getExternalState(JobStateInternal smState) {
switch (smState) {
case KILL_WAIT:
case KILL_ABORT:
@ -1005,7 +1005,13 @@ private static JobState getExternalState(JobStateInternal smState) {
case FAIL_ABORT:
return JobState.FAILED;
case REBOOT:
return JobState.ERROR;
if (appContext.isLastAMRetry()) {
return JobState.ERROR;
} else {
// In case of not last retry, return the external state as RUNNING since
// otherwise JobClient will exit when it polls the AM for job state
return JobState.RUNNING;
}
default:
return JobState.valueOf(smState.name());
}

View File

@ -29,6 +29,7 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -41,6 +42,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -51,12 +54,15 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
/**
@ -368,6 +374,47 @@ public void testJobError() throws Exception {
app.waitForState(job, JobState.ERROR);
}
@Test
public void testJobRebootNotLastRetry() throws Exception {
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task task = it.next();
app.waitForState(task, TaskState.RUNNING);
//send an reboot event
app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
JobEventType.JOB_AM_REBOOT));
// return exteranl state as RUNNING since otherwise the JobClient will
// prematurely exit.
app.waitForState(job, JobState.RUNNING);
}
@Test
public void testJobRebootOnLastRetry() throws Exception {
// make startCount as 2 since this is last retry which equals to
// DEFAULT_MAX_AM_RETRY
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2);
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task task = it.next();
app.waitForState(task, TaskState.RUNNING);
//send an reboot event
app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
JobEventType.JOB_AM_REBOOT));
// return exteranl state as ERROR if this is the last retry
app.waitForState(job, JobState.ERROR);
}
private final class MRAppWithSpiedJob extends MRApp {
private JobImpl spiedJob;

View File

@ -142,7 +142,7 @@ public void testJobNoTasks() {
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
"tag1,tag2");
dispatcher.register(EventType.class, jseHandler);
JobImpl job = createStubbedJob(conf, dispatcher, 0);
JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(job.getID()));
@ -170,7 +170,7 @@ public void testCommitJobFailsJob() throws Exception {
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
@ -195,7 +195,7 @@ public void testCheckJobCompleteSuccess() throws Exception {
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
@ -239,7 +239,9 @@ public synchronized void setupJob(JobContext jobContext)
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
AppContext mockContext = mock(AppContext.class);
when(mockContext.isLastAMRetry()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@ -248,6 +250,10 @@ public synchronized void setupJob(JobContext jobContext)
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
assertJobState(job, JobStateInternal.REBOOT);
// return the external state as RUNNING since otherwise JobClient will
// exit when it polls the AM for job state
Assert.assertEquals(JobState.RUNNING, job.getState());
dispatcher.stop();
commitHandler.stop();
}
@ -256,6 +262,7 @@ public synchronized void setupJob(JobContext jobContext)
public void testRebootedDuringCommit() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@ -266,13 +273,18 @@ public void testRebootedDuringCommit() throws Exception {
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
AppContext mockContext = mock(AppContext.class);
when(mockContext.isLastAMRetry()).thenReturn(true);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
syncBarrier.await();
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
assertJobState(job, JobStateInternal.REBOOT);
// return the external state as FAILED since this is last retry.
Assert.assertEquals(JobState.ERROR, job.getState());
dispatcher.stop();
commitHandler.stop();
}
@ -301,7 +313,7 @@ public synchronized void setupJob(JobContext jobContext)
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@ -328,7 +340,7 @@ public void testKilledDuringCommit() throws Exception {
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
@ -352,7 +364,7 @@ public void testAbortJobCalledAfterKillingTasks() throws IOException {
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
job.handle(new JobTaskEvent(
@ -388,7 +400,7 @@ public void testFailAbortDoesntHang() throws IOException {
//Job has only 1 mapper task. No reducers
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);
//Fail / finish all the tasks. This should land the JobImpl directly in the
//FAIL_ABORT state
@ -440,7 +452,7 @@ public synchronized void abortJob(JobContext jobContext, State state)
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@ -477,7 +489,7 @@ public synchronized void abortJob(JobContext jobContext, State state)
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@ -687,7 +699,7 @@ public void testTransitionsAtFailed() throws IOException {
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2);
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@ -735,12 +747,12 @@ public void runOnNextHeartbeat(Runnable callback) {
}
private static StubbedJob createStubbedJob(Configuration conf,
Dispatcher dispatcher, int numSplits) {
Dispatcher dispatcher, int numSplits, AppContext appContext) {
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
StubbedJob job = new StubbedJob(jobId,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
conf,dispatcher.getEventHandler(), true, "somebody", numSplits);
conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
dispatcher.register(JobEventType.class, job);
EventHandler mockHandler = mock(EventHandler.class);
dispatcher.register(TaskEventType.class, mockHandler);
@ -751,8 +763,8 @@ private static StubbedJob createStubbedJob(Configuration conf,
}
private static StubbedJob createRunningStubbedJob(Configuration conf,
Dispatcher dispatcher, int numSplits) {
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
Dispatcher dispatcher, int numSplits, AppContext appContext) {
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits, appContext);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(job.getID()));
@ -880,13 +892,13 @@ protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine
}
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
boolean newApiCommitter, String user, int numSplits) {
Configuration conf, EventHandler eventHandler, boolean newApiCommitter,
String user, int numSplits, AppContext appContext) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
MRAppMetrics.create(), null, newApiCommitter, user,
System.currentTimeMillis(), null, null, null, null);
System.currentTimeMillis(), null, appContext, null, null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

View File

@ -124,7 +124,6 @@ public static Path getStagingDir(Cluster cluster, Configuration conf)
} else {
fs.mkdirs(stagingArea,
new FsPermission(JOB_DIR_PERMISSION));
fs.setOwner(stagingArea, currentUser, null);
}
return stagingArea;
}

View File

@ -54,6 +54,18 @@ Release 2.1.1-beta - UNRELEASED
YARN-942. In Fair Scheduler documentation, inconsistency on which
properties have prefix (Akira Ajisaka via Sandy Ryza)
YARN-1083. Changed ResourceManager to fail when the expiry interval is less
than the configured node-heartbeat interval. (Zhijie Shen via vinodkv)
YARN-1081. Made a trivial change to YARN node CLI header to avoid potential
confusion. (Akira AJISAKA via vinodkv)
YARN-1034. Remove "experimental" in the Fair Scheduler documentation.
(Karthik Kambatla via Sandy Ryza)
YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -105,6 +117,15 @@ Release 2.1.1-beta - UNRELEASED
YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same
key for allocations. (tucu)
YARN-981. Fixed YARN webapp so that /logs servlet works like before. (Jian He
via vinodkv)
YARN-602. Fixed NodeManager to not let users override some mandatory
environmental variables. (Kenji Kikushima via vinodkv)
YARN-1101. Active nodes can be decremented below 0 (Robert Parker
via tgraves)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES
@ -1223,6 +1244,9 @@ Release 0.23.10 - UNRELEASED
YARN-337. RM handles killed application tracking URL poorly (jlowe)
YARN-1101. Active nodes can be decremented below 0 (Robert Parker
via tgraves)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -43,7 +43,7 @@
@Private
@Unstable
public class NodeCLI extends YarnCLI {
private static final String NODES_PATTERN = "%16s\t%15s\t%17s\t%18s" +
private static final String NODES_PATTERN = "%16s\t%15s\t%17s\t%28s" +
System.getProperty("line.separator");
private static final String NODE_STATE_CMD = "states";
@ -133,7 +133,7 @@ private void listClusterNodes(Set<NodeState> nodeStates)
nodeStates.toArray(new NodeState[0]));
writer.println("Total Nodes:" + nodesReport.size());
writer.printf(NODES_PATTERN, "Node-Id", "Node-State", "Node-Http-Address",
"Running-Containers");
"Number-of-Running-Containers");
for (NodeReport nodeReport : nodesReport) {
writer.printf(NODES_PATTERN, nodeReport.getNodeId(), nodeReport
.getNodeState(), nodeReport.getHttpAddress(), nodeReport

View File

@ -543,9 +543,9 @@ public void testListClusterNodes() throws Exception {
PrintWriter pw = new PrintWriter(baos);
pw.println("Total Nodes:1");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t NEW\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t NEW\t host1:8888\t");
pw.println(" 0");
pw.close();
String nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -564,11 +564,11 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:2");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t RUNNING\t host1:8888");
pw.println("\t 0");
pw.print(" host1:0\t RUNNING\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t RUNNING\t host1:8888\t");
pw.println(" 0");
pw.print(" host1:0\t RUNNING\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -593,9 +593,9 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:1");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t UNHEALTHY\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t UNHEALTHY\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -614,9 +614,9 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:1");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t DECOMMISSIONED\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t DECOMMISSIONED\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -635,9 +635,9 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:1");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t REBOOTED\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t REBOOTED\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -656,9 +656,9 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:1");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t LOST\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t LOST\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -681,17 +681,17 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:5");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t NEW\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t RUNNING\t host1:8888");
pw.println("\t 0");
pw.print(" host1:0\t RUNNING\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t REBOOTED\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t LOST\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t NEW\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t RUNNING\t host1:8888\t");
pw.println(" 0");
pw.print(" host1:0\t RUNNING\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t REBOOTED\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t LOST\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());
@ -712,21 +712,21 @@ public void testListClusterNodes() throws Exception {
pw = new PrintWriter(baos);
pw.println("Total Nodes:7");
pw.print(" Node-Id\t Node-State\tNode-Http-Address\t");
pw.println("Running-Containers");
pw.print(" host0:0\t NEW\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t RUNNING\t host1:8888");
pw.println("\t 0");
pw.print(" host1:0\t RUNNING\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t UNHEALTHY\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t DECOMMISSIONED\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t REBOOTED\t host1:8888");
pw.println("\t 0");
pw.print(" host0:0\t LOST\t host1:8888");
pw.println("\t 0");
pw.println("Number-of-Running-Containers");
pw.print(" host0:0\t NEW\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t RUNNING\t host1:8888\t");
pw.println(" 0");
pw.print(" host1:0\t RUNNING\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t UNHEALTHY\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t DECOMMISSIONED\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t REBOOTED\t host1:8888\t");
pw.println(" 0");
pw.print(" host0:0\t LOST\t host1:8888\t");
pw.println(" 0");
pw.close();
nodesReportStr = baos.toString("UTF-8");
Assert.assertEquals(nodesReportStr, sysOutStream.toString());

View File

@ -72,10 +72,18 @@ public int run(String[] args) throws Exception {
+ "nodename:port (must be specified if container id is specified)");
opts.addOption(APP_OWNER_OPTION, true,
"AppOwner (assumed to be current user if not specified)");
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
Options printOpts = new Options();
printOpts.addOption(opts.getOption(CONTAINER_ID_OPTION));
printOpts.addOption(opts.getOption(NODE_ADDRESS_OPTION));
printOpts.addOption(opts.getOption(APP_OWNER_OPTION));
if (args.length < 1) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
printHelpMessage(printOpts);
return -1;
}
@ -92,16 +100,13 @@ public int run(String[] args) throws Exception {
appOwner = commandLine.getOptionValue(APP_OWNER_OPTION);
} catch (ParseException e) {
System.out.println("options parsing failed: " + e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
printHelpMessage(printOpts);
return -1;
}
if (appIdStr == null) {
System.out.println("ApplicationId cannot be null!");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
printHelpMessage(printOpts);
return -1;
}
@ -119,8 +124,7 @@ public int run(String[] args) throws Exception {
} else if ((containerIdStr == null && nodeAddress != null)
|| (containerIdStr != null && nodeAddress == null)) {
System.out.println("ContainerId or NodeAddress cannot be null!");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
printHelpMessage(printOpts);
resultCode = -1;
} else {
Path remoteRootLogDir =
@ -255,4 +259,12 @@ public static void main(String[] args) throws Exception {
int exitCode = logDumper.run(args);
System.exit(exitCode);
}
private void printHelpMessage(Options options) {
System.out.println("Retrieve logs for completed YARN applications.");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("yarn logs -applicationId <application ID> [OPTIONS]", new Options());
formatter.setSyntaxPrefix("");
formatter.printHelp("general options are:", options);
}
}

View File

@ -242,7 +242,10 @@ public void setup() {
for(Map.Entry<String, Object> entry : attributes.entrySet()) {
server.setAttribute(entry.getKey(), entry.getValue());
}
server.addGlobalFilter("guice", GuiceFilter.class.getName(), null);
String webAppPath = "/" + name + "/*";
server.defineFilter(server.getWebAppContext(), "guice",
GuiceFilter.class.getName(), null, new String[] { webAppPath, "/" });
webapp.setConf(conf);
webapp.setHttpServer(server);
server.start();

View File

@ -19,14 +19,30 @@
package org.apache.hadoop.yarn.logaggregation;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.Test;
public class TestLogDumper {
ByteArrayOutputStream sysOutStream;
private PrintStream sysOut;
@Before
public void setUp() {
sysOutStream = new ByteArrayOutputStream();
sysOut = new PrintStream(sysOutStream);
System.setOut(sysOut);
}
@Test
public void testFailResultCodes() throws Exception {
Configuration conf = new YarnConfiguration();
@ -44,4 +60,30 @@ public void testFailResultCodes() throws Exception {
"nonexistentnode:1234", "nobody");
assertTrue("Should return an error code", exitCode != 0);
}
@Test
public void testHelpMessage() throws Exception {
Configuration conf = new YarnConfiguration();
LogDumper dumper = new LogDumper();
dumper.setConf(conf);
int exitCode = dumper.run(new String[]{});
assertTrue(exitCode == -1);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("Retrieve logs for completed YARN applications.");
pw.println("usage: yarn logs -applicationId <application ID> [OPTIONS]");
pw.println();
pw.println("general options are:");
pw.println(" -appOwner <Application Owner> AppOwner (assumed to be current user if");
pw.println(" not specified)");
pw.println(" -containerId <Container ID> ContainerId (must be specified if node");
pw.println(" address is specified)");
pw.println(" -nodeAddress <Node Address> NodeAddress in the format nodename:port");
pw.println(" (must be specified if container id is");
pw.println(" specified)");
pw.close();
String appReportStr = baos.toString("UTF-8");
Assert.assertEquals(appReportStr, sysOutStream.toString());
}
}

View File

@ -588,20 +588,18 @@ public void sanitizeEnv(Map<String, String> environment, Path pwd,
environment.put(Environment.LOG_DIRS.name(),
StringUtils.join(",", containerLogDirs));
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
environment.put(Environment.USER.name(), container.getUser());
putEnvIfNotNull(environment,
Environment.LOGNAME.name(),container.getUser());
putEnvIfNotNull(environment,
Environment.HOME.name(),
environment.put(Environment.LOGNAME.name(), container.getUser());
environment.put(Environment.HOME.name(),
conf.get(
YarnConfiguration.NM_USER_HOME_DIR,
YarnConfiguration.DEFAULT_NM_USER_HOME_DIR
)
);
putEnvIfNotNull(environment, Environment.PWD.name(), pwd.toString());
environment.put(Environment.PWD.name(), pwd.toString());
putEnvIfNotNull(environment,
Environment.HADOOP_CONF_DIR.name(),

View File

@ -346,7 +346,6 @@ public void testContainerEnvVariables() throws Exception {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
int port = 12345;
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
Map<String, String> userSetEnv = new HashMap<String, String>();
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
@ -354,6 +353,11 @@ public void testContainerEnvVariables() throws Exception {
userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
userSetEnv.put(Environment.USER.key(), "user_set_" +
Environment.USER.key());
userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME");
userSetEnv.put(Environment.PWD.name(), "user_set_PWD");
userSetEnv.put(Environment.HOME.name(), "user_set_HOME");
containerLaunchContext.setEnvironment(userSetEnv);
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
@ -371,6 +375,14 @@ public void testContainerEnvVariables() throws Exception {
+ processStartFile);
fileWriter.println("@echo " + Environment.LOCAL_DIRS.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.USER.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.LOGNAME.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.PWD.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.HOME.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
@ -385,6 +397,14 @@ public void testContainerEnvVariables() throws Exception {
+ processStartFile);
fileWriter.write("\necho $" + Environment.LOCAL_DIRS.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.USER.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.LOGNAME.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.PWD.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.HOME.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100");
}
@ -452,6 +472,22 @@ public void testContainerEnvVariables() throws Exception {
reader.readLine());
Assert.assertEquals(String.valueOf(HTTP_PORT), reader.readLine());
Assert.assertEquals(StringUtils.join(",", appDirs), reader.readLine());
Assert.assertEquals(user, reader.readLine());
Assert.assertEquals(user, reader.readLine());
String obtainedPWD = reader.readLine();
boolean found = false;
for (Path localDir : appDirs) {
if (new Path(localDir, cId.toString()).toString().equals(obtainedPWD)) {
found = true;
break;
}
}
Assert.assertTrue("Wrong local-dir found : " + obtainedPWD, found);
Assert.assertEquals(
conf.get(
YarnConfiguration.NM_USER_HOME_DIR,
YarnConfiguration.DEFAULT_NM_USER_HOME_DIR),
reader.readLine());
Assert.assertEquals(cId.toString(), containerLaunchContext
.getEnvironment().get(Environment.CONTAINER_ID.name()));
@ -465,6 +501,26 @@ public void testContainerEnvVariables() throws Exception {
.getEnvironment().get(Environment.LOCAL_DIRS.name()));
Assert.assertEquals(StringUtils.join(",", containerLogDirs),
containerLaunchContext.getEnvironment().get(Environment.LOG_DIRS.name()));
Assert.assertEquals(user, containerLaunchContext.getEnvironment()
.get(Environment.USER.name()));
Assert.assertEquals(user, containerLaunchContext.getEnvironment()
.get(Environment.LOGNAME.name()));
found = false;
obtainedPWD =
containerLaunchContext.getEnvironment().get(Environment.PWD.name());
for (Path localDir : appDirs) {
if (new Path(localDir, cId.toString()).toString().equals(obtainedPWD)) {
found = true;
break;
}
}
Assert.assertTrue("Wrong local-dir found : " + obtainedPWD, found);
Assert.assertEquals(
conf.get(
YarnConfiguration.NM_USER_HOME_DIR,
YarnConfiguration.DEFAULT_NM_USER_HOME_DIR),
containerLaunchContext.getEnvironment()
.get(Environment.HOME.name()));
// Get the pid of the process
String pid = reader.readLine().trim();
@ -538,7 +594,6 @@ public void testDelayedKill() throws Exception {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
int port = 12345;
// upload the script file so that the container can run it
URL resource_alpha =

View File

@ -365,6 +365,20 @@ protected static void validateConfigs(Configuration conf) {
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
}
// validate expireIntvl >= heartbeatIntvl
long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
long heartbeatIntvl =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (expireIntvl < heartbeatIntvl) {
throw new YarnRuntimeException("Nodemanager expiry interval should be no"
+ " less than heartbeat interval, "
+ YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl
+ ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
+ heartbeatIntvl);
}
}
@Private

View File

@ -393,9 +393,18 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
}
}
private void updateMetricsForDeactivatedNode(NodeState finalState) {
private void updateMetricsForDeactivatedNode(NodeState initialState,
NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.decrNumActiveNodes();
switch (initialState) {
case RUNNING:
metrics.decrNumActiveNodes();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
}
switch (finalState) {
case DECOMMISSIONED:
@ -505,7 +514,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
if (!rmNode.getState().equals(NodeState.UNHEALTHY)) {
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
@ -520,7 +530,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(finalState);
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
}
@ -550,7 +560,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Update metrics
rmNode.updateMetricsForDeactivatedNode(NodeState.UNHEALTHY);
rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
NodeState.UNHEALTHY);
return NodeState.UNHEALTHY;
}

View File

@ -267,7 +267,21 @@ public void testRunningExpire() {
@Test
public void testUnhealthyExpire() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node.getState());
}
@ -291,8 +305,22 @@ public void testRunningDecommission() {
@Test
public void testUnhealthyDecommission() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@ -307,8 +335,22 @@ public void testRunningRebooting() {
@Test
public void testUnhealthyRebooting() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.REBOOTING));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted + 1, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.REBOOTED, node.getState());
}

View File

@ -203,4 +203,21 @@ public void testResourceManagerInitConfigValidation() throws Exception {
}
}
@Test
public void testNMExpiryAndHeartbeatIntervalsValidation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1001);
resourceManager = new ResourceManager();;
try {
resourceManager.init(conf);
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith("Nodemanager expiry interval should be no"
+ " less than heartbeat interval")) {
throw e;
}
}
}
}

View File

@ -25,8 +25,7 @@ Hadoop MapReduce Next Generation - Fair Scheduler
* {Purpose}
This document describes the <<<FairScheduler>>>, a pluggable scheduler for Hadoop
which provides a way to share large clusters. <<NOTE:>> The Fair Scheduler
implementation is currently under development and should be considered experimental.
that allows YARN applications to share resources in large clusters fairly.
* {Introduction}