Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1203781 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-11-18 18:29:26 +00:00
commit 67c372991c
65 changed files with 1306 additions and 359 deletions

View File

@ -110,6 +110,9 @@ Release 0.23.1 - Unreleased
HADOOP-7801. HADOOP_PREFIX cannot be overriden. (Bruno Mahé via tomwhite)
HADOOP-7802. Hadoop scripts unconditionally source
"$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite)
OPTIMIZATIONS
BUG FIXES
@ -117,6 +120,9 @@ Release 0.23.1 - Unreleased
HADOOP-7811. TestUserGroupInformation#testGetServerSideGroups test fails in chroot.
(Jonathan Eagles via mahadev)
HADOOP-7787. Make source tarball use conventional name.
(Bruno Mahé via tomwhite)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -21,7 +21,9 @@ bin=`which $0`
bin=`dirname ${bin}`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
function print_usage(){
echo "Usage: hadoop [--config confdir] COMMAND"

View File

@ -39,7 +39,9 @@ fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# get arguments

View File

@ -29,6 +29,8 @@ fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
exec "$bin/slaves.sh" --config $HADOOP_CONF_DIR cd "$HADOOP_PREFIX" \; "$bin/hadoop-daemon.sh" --config $HADOOP_CONF_DIR "$@"

View File

@ -30,7 +30,9 @@
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then
. "${HADOOP_CONF_DIR}/hadoop-env.sh"

View File

@ -38,7 +38,9 @@ fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then
. "${HADOOP_CONF_DIR}/hadoop-env.sh"

View File

@ -23,7 +23,9 @@ echo "This script is Deprecated. Instead use start-dfs.sh and start-mapred.sh"
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# start hdfs daemons if hdfs is present
if [ -f "${HADOOP_HDFS_HOME}"/bin/start-dfs.sh ]; then

View File

@ -23,7 +23,9 @@ echo "This script is Deprecated. Instead use stop-dfs.sh and stop-mapred.sh"
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# stop hdfs daemons if hdfs is present
if [ -f "${HADOOP_HDFS_HOME}"/bin/stop-dfs.sh ]; then

View File

@ -24,7 +24,9 @@ if [ "$HADOOP_HOME" != "" ]; then
echo
fi
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
usage() {
echo "

View File

@ -19,7 +19,9 @@ bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P)
script="$(basename -- "$this")"
this="$bin/$script"
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
usage() {
echo "

View File

@ -504,7 +504,10 @@ if [ "${AUTOSETUP}" == "1" -o "${AUTOSETUP}" == "y" ]; then
fi
chmod 755 -R ${HADOOP_PREFIX}/sbin/*hadoop*
chmod 755 -R ${HADOOP_PREFIX}/bin/hadoop
chmod 755 -R ${HADOOP_PREFIX}/libexec/hadoop-config.sh
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-${HADOOP_PREFIX}/libexec}
chmod 755 -R ${HADOOP_LIBEXEC_DIR}/hadoop-config.sh
mkdir -p /home/${HADOOP_MR_USER}
chown ${HADOOP_MR_USER}:${HADOOP_GROUP} /home/${HADOOP_MR_USER}
HDFS_DIR=`echo ${HADOOP_HDFS_DIR} | sed -e 's/,/ /g'`

View File

@ -18,7 +18,9 @@
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
usage() {
echo "

View File

@ -25,7 +25,9 @@ if [ "$HADOOP_HOME" != "" ]; then
echo
fi
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
usage() {
echo "

View File

@ -31,7 +31,9 @@
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
usage() {
echo "

View File

@ -0,0 +1,146 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.util;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLDecoder;
import java.text.MessageFormat;
import java.util.Enumeration;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
* Finds the Jar for a class. If the class is in a directory in the
* classpath, it creates a Jar on the fly with the contents of the directory
* and returns the path to that Jar. If a Jar is created, it is created in
* the system temporary directory.
*/
public class JarFinder {
private static void zipDir(File dir, String relativePath, ZipOutputStream zos)
throws IOException {
Preconditions.checkNotNull(relativePath, "relativePath");
Preconditions.checkNotNull(zos, "zos");
zipDir(dir, relativePath, zos, true);
zos.close();
}
private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
boolean start) throws IOException {
String[] dirList = dir.list();
for (String aDirList : dirList) {
File f = new File(dir, aDirList);
if (!f.isHidden()) {
if (f.isDirectory()) {
if (!start) {
ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
zos.putNextEntry(dirEntry);
zos.closeEntry();
}
String filePath = f.getPath();
File file = new File(filePath);
zipDir(file, relativePath + f.getName() + "/", zos, false);
}
else {
ZipEntry anEntry = new ZipEntry(relativePath + f.getName());
zos.putNextEntry(anEntry);
InputStream is = new FileInputStream(f);
byte[] arr = new byte[4096];
int read = is.read(arr);
while (read > -1) {
zos.write(arr, 0, read);
read = is.read(arr);
}
is.close();
zos.closeEntry();
}
}
}
}
private static void createJar(File dir, File jarFile) throws IOException {
Preconditions.checkNotNull(dir, "dir");
Preconditions.checkNotNull(jarFile, "jarFile");
File jarDir = jarFile.getParentFile();
if (!jarDir.exists()) {
if (!jarDir.mkdirs()) {
throw new IOException(MessageFormat.format("could not create dir [{0}]",
jarDir));
}
}
JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile),
new Manifest());
zipDir(dir, "", zos);
}
/**
* Returns the full path to the Jar containing the class. It always return a
* JAR.
*
* @param klass class.
*
* @return path to the Jar containing the class.
*/
public static String getJar(Class klass) {
Preconditions.checkNotNull(klass, "klass");
ClassLoader loader = klass.getClassLoader();
if (loader != null) {
String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
try {
for (Enumeration itr = loader.getResources(class_file);
itr.hasMoreElements(); ) {
URL url = (URL) itr.nextElement();
String path = url.getPath();
if (path.startsWith("file:")) {
path = path.substring("file:".length());
}
path = URLDecoder.decode(path, "UTF-8");
if ("jar".equals(url.getProtocol())) {
path = URLDecoder.decode(path, "UTF-8");
return path.replaceAll("!.*$", "");
}
else if ("file".equals(url.getProtocol())) {
String klassName = klass.getName();
klassName = klassName.replace(".", "/") + ".class";
path = path.substring(0, path.length() - klassName.length());
File baseDir = new File(path);
File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
testDir = testDir.getAbsoluteFile();
if (!testDir.exists()) {
testDir.mkdirs();
}
File tempJar = File.createTempFile("hadoop-", "", testDir);
tempJar = new File(tempJar.getAbsolutePath() + ".jar");
createJar(baseDir, tempJar);
return tempJar.getAbsolutePath();
}
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return null;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
public class TestJarFinder {
@Test
public void testAppend() throws Exception {
//picking a class that is for sure in a JAR in the classpath
String jar = JarFinder.getJar(LogFactory.class);
Assert.assertTrue(new File(jar).exists());
//picking a class that is for sure in a directory in the classpath
//in this case the JAR is created on the fly
jar = JarFinder.getJar(TestJarFinder.class);
Assert.assertTrue(new File(jar).exists());
}
}

View File

@ -107,6 +107,15 @@ Release 0.23.1 - UNRELEASED
NEW FEATURES
IMPROVEMENTS
HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
HDFS-2544. Hadoop scripts unconditionally source
"$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite)
HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite)
HDFS-2562. Refactor DN configuration variables out of DataNode class
(todd)
OPTIMIZATIONS

View File

@ -16,9 +16,7 @@
# limitations under the License.
#
if [ "$HADOOP_PREFIX" = "" ]; then
export HADOOP_PREFIX=/usr/local/share/hadoop
fi
export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr/local/share/hadoop}
if [ "$OS_ARCH" = "" ]; then
export OS_ARCH=amd64

View File

@ -36,7 +36,9 @@
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin/../libexec/hdfs-config.sh"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
if [ "$1" = '' ] ; then
"Error: please specify local exclude file as a first argument"

View File

@ -19,7 +19,9 @@ bin=`which $0`
bin=`dirname ${bin}`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
function print_usage(){
echo "Usage: hdfs [--config confdir] COMMAND"

View File

@ -24,8 +24,10 @@ bin=`cd "$bin"; pwd`
export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}"
if [ -e "$bin/../libexec/hadoop-config.sh" ]; then
. $bin/../libexec/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then
. ${HADOOP_LIBEXEC_DIR}/hadoop-config.sh
elif [ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]; then
. "$HADOOP_COMMON_HOME"/libexec/hadoop-config.sh
elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then

View File

@ -23,7 +23,9 @@
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin/../libexec/hdfs-config.sh"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
namenodes=$("$HADOOP_PREFIX/bin/hdfs" getconf -nnRpcAddresses)
if [ "$?" != '0' ] ; then errorFlag='1' ;

View File

@ -18,7 +18,9 @@
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
# Start balancer daemon.

View File

@ -25,7 +25,9 @@ usage="Usage: start-dfs.sh [-upgrade|-rollback]"
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
# get arguments
if [ $# -ge 1 ]; then

View File

@ -22,7 +22,9 @@ usage="Usage (run as root in order to start secure datanodes): start-secure-dns.
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
if [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
"$HADOOP_PREFIX"/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs start datanode $dataStartOpt

View File

@ -18,7 +18,9 @@
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
# Stop balancer daemon.
# Run this on the machine where the balancer is running

View File

@ -18,7 +18,9 @@
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
#---------------------------------------------------------
# namenodes

View File

@ -22,7 +22,9 @@ usage="Usage (run as root in order to stop secure datanodes): stop-secure-dns.sh
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/../libexec/hdfs-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hdfs-config.sh
if [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
"$HADOOP_PREFIX"/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs stop datanode

View File

@ -55,6 +55,10 @@ synchronized BlockTokenSecretManager get(String bpid) {
}
return secretMgr;
}
public synchronized boolean isBlockPoolRegistered(String bpid) {
return map.containsKey(bpid);
}
/** Return an empty BlockTokenIdentifer */
@Override

View File

@ -185,8 +185,8 @@ class BlockReceiver implements Closeable {
" while receiving block " + block + " from " + inAddr);
}
}
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
this.syncBehindWrites = datanode.shouldSyncBehindWrites();
this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@ -249,7 +249,7 @@ public void close() throws IOException {
try {
if (checksumOut != null) {
checksumOut.flush();
if (datanode.syncOnClose && (cout instanceof FileOutputStream)) {
if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
((FileOutputStream)cout).getChannel().force(true);
}
checksumOut.close();
@ -265,7 +265,7 @@ public void close() throws IOException {
try {
if (out != null) {
out.flush();
if (datanode.syncOnClose && (out instanceof FileOutputStream)) {
if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
((FileOutputStream)out).getChannel().force(true);
}
out.close();
@ -435,7 +435,7 @@ private void readNextPacket() throws IOException {
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN
int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
+ chunkSize - 1)/chunkSize;
buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
Math.max(chunksPerPacket, 1) * chunkSize);

View File

@ -185,8 +185,8 @@ class BlockSender implements java.io.Closeable {
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
this.readaheadLength = datanode.getReadaheadLength();
this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
this.readaheadLength = datanode.getDnConf().readaheadLength;
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
synchronized(datanode.data) {
this.replica = getReplica(block, datanode);
@ -215,7 +215,7 @@ class BlockSender implements java.io.Closeable {
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
// use normal transfer in those cases
this.transferToAllowed = datanode.transferToAllowed &&
this.transferToAllowed = datanode.getDnConf().transferToAllowed &&
(!is32Bit || length <= Integer.MAX_VALUE);
DataChecksum csum;

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
/**
* Simple class encapsulating all of the configuration that the DataNode
* loads at startup time.
*/
class DNConf {
final int socketTimeout;
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
final boolean transferToAllowed;
final boolean dropCacheBehindWrites;
final boolean syncBehindWrites;
final boolean dropCacheBehindReads;
final boolean syncOnClose;
final long readaheadLength;
final long heartBeatInterval;
final long blockReportInterval;
final long deleteReportInterval;
final long initialBlockReportDelay;
final int writePacketSize;
public DNConf(Configuration conf) {
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
socketKeepaliveTimeout = conf.getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
/* Based on results on different platforms, we might need set the default
* to false on some of them. */
transferToAllowed = conf.getBoolean(
DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
readaheadLength = conf.getLong(
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
dropCacheBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
syncBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
dropCacheBehindReads = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
long initBRDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
if (initBRDelay >= blockReportInterval) {
initBRDelay = 0;
DataNode.LOG.info("dfs.blockreport.initialDelay is greater than " +
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
}
initialBlockReportDelay = initBRDelay;
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
this.deleteReportInterval = 100 * heartBeatInterval;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
}
}

View File

@ -19,15 +19,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
@ -51,17 +44,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.Util.now;
@ -104,7 +90,6 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -136,6 +121,7 @@
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
@ -275,7 +261,7 @@ class BlockPoolManager {
List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
for(InetSocketAddress isa : isas) {
BPOfferService bpos = new BPOfferService(isa);
BPOfferService bpos = new BPOfferService(isa, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
}
@ -373,19 +359,19 @@ void refreshNamenodes(Configuration conf)
}
for (InetSocketAddress nnaddr : toStart) {
BPOfferService bpos = new BPOfferService(nnaddr);
BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
for (BPOfferService bpos : toShutdown) {
remove(bpos);
}
}
for (BPOfferService bpos : toShutdown) {
bpos.stop();
bpos.join();
}
// stoping the BPOSes causes them to call remove() on their own when they
// clean up.
// Now start the threads that are not already running.
startAll();
}
@ -401,12 +387,7 @@ void refreshNamenodes(Configuration conf)
AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
long blockReportInterval;
boolean resetBlockReportTime = true;
long deleteReportInterval;
long lastDeletedReport = 0;
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
long heartBeatInterval;
private DNConf dnConf;
private boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
@ -416,18 +397,9 @@ void refreshNamenodes(Configuration conf)
private volatile String hostName; // Host name of this datanode
private static String dnThreadName;
int socketTimeout;
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
private boolean dropCacheBehindWrites = false;
private boolean syncBehindWrites = false;
private boolean dropCacheBehindReads = false;
private long readaheadLength = 0;
int writePacketSize = 0;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
boolean syncOnClose;
public DataBlockScanner blockScanner = null;
private DirectoryScanner directoryScanner = null;
@ -495,51 +467,6 @@ private static String getHostName(Configuration config)
return name;
}
private void initConfig(Configuration conf) {
this.socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
/* Based on results on different platforms, we might need set the default
* to false on some of them. */
this.transferToAllowed = conf.getBoolean(
DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.readaheadLength = conf.getLong(
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
this.dropCacheBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
this.syncBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
this.dropCacheBehindReads = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.initialBlockReportDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
if (this.initialBlockReportDelay >= blockReportInterval) {
this.initialBlockReportDelay = 0;
LOG.info("dfs.blockreport.initialDelay is greater than " +
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
}
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
this.deleteReportInterval = 100 * heartBeatInterval;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
}
private void startInfoServer(Configuration conf) throws IOException {
// create a servlet to serve full-file content
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
@ -653,6 +580,7 @@ private synchronized void initDataBlockScanner(Configuration conf) {
return;
}
String reason = null;
assert data != null;
if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
reason = "verification is turned off by configuration";
@ -709,7 +637,7 @@ private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
ServerSocket ss;
if(secureResources == null) {
ss = (socketWriteTimeout > 0) ?
ss = (dnConf.socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
} else {
@ -774,11 +702,15 @@ void setHeartbeatsDisabledForTests(
* </ul>
*/
@InterfaceAudience.Private
class BPOfferService implements Runnable {
static class BPOfferService implements Runnable {
final InetSocketAddress nnAddr;
DatanodeRegistration bpRegistration;
NamespaceInfo bpNSInfo;
long lastBlockReport = 0;
long lastDeletedReport = 0;
boolean resetBlockReportTime = true;
private Thread bpThread;
private DatanodeProtocol bpNamenode;
private String blockPoolId;
@ -788,14 +720,15 @@ class BPOfferService implements Runnable {
= new LinkedList<ReceivedDeletedBlockInfo>();
private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
private boolean isBlockTokenInitialized = false;
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
private final DNConf dnConf;
BPOfferService(InetSocketAddress isa) {
this.bpRegistration = new DatanodeRegistration(getMachineName());
bpRegistration.setInfoPort(infoServer.getPort());
bpRegistration.setIpcPort(getIpcPort());
this.nnAddr = isa;
BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
this.dn = dn;
this.bpRegistration = dn.createRegistration();
this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf();
}
/**
@ -822,7 +755,6 @@ private InetSocketAddress getNNSocketAddress() {
void setNamespaceInfo(NamespaceInfo nsinfo) {
bpNSInfo = nsinfo;
this.blockPoolId = nsinfo.getBlockPoolID();
blockPoolManager.addBlockPool(this);
}
void setNameNode(DatanodeProtocol dnProtocol) {
@ -831,7 +763,7 @@ void setNameNode(DatanodeProtocol dnProtocol) {
private NamespaceInfo handshake() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo();
while (shouldRun && shouldServiceRun) {
while (dn.shouldRun && shouldServiceRun) {
try {
nsInfo = bpNamenode.versionRequest();
// verify build version
@ -867,7 +799,7 @@ private NamespaceInfo handshake() throws IOException {
return nsInfo;
}
void setupBP(Configuration conf, AbstractList<File> dataDirs)
void setupBP(Configuration conf)
throws IOException {
// get NN proxy
DatanodeProtocol dnp =
@ -878,52 +810,19 @@ void setupBP(Configuration conf, AbstractList<File> dataDirs)
// handshake with NN
NamespaceInfo nsInfo = handshake();
setNamespaceInfo(nsInfo);
synchronized(DataNode.this) {
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setupBPStorage();
setClusterId(nsInfo.clusterID);
}
initPeriodicScanners(conf);
}
void setupBPStorage() throws IOException {
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
dn.initBlockPool(this, nsInfo);
if (simulatedFSDataset) {
initFsDataSet(conf, dataDirs);
bpRegistration.setStorageID(getStorageId()); //same as DN
bpRegistration.setStorageID(dn.getStorageId());
StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
if (storageInfo == null) {
// it's null in the case of SimulatedDataSet
bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
bpRegistration.setStorageInfo(nsInfo);
} else {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
dataDirs, startOpt);
LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
+ blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
+ bpNSInfo);
bpRegistration.setStorageID(getStorageId());
bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
initFsDataSet(conf, dataDirs);
bpRegistration.setStorageInfo(storageInfo);
}
data.addBlockPool(blockPoolId, conf);
}
/**
* This methods arranges for the data node to send the block report at
* the next heartbeat.
@ -931,9 +830,9 @@ void setupBPStorage() throws IOException {
void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
lastBlockReport = System.currentTimeMillis()
- ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
} else { // send at next heartbeat
lastBlockReport = lastHeartbeat - blockReportInterval;
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
}
resetBlockReportTime = true; // reset future BRs for randomness
}
@ -1038,11 +937,11 @@ DatanodeCommand blockReport() throws IOException {
// send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) {
if (startTime - lastBlockReport > dnConf.blockReportInterval) {
// Create block report
long brCreateStartTime = now();
BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId);
// Send block report
long brSendStartTime = now();
@ -1052,7 +951,7 @@ DatanodeCommand blockReport() throws IOException {
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
metrics.addBlockReport(brSendCost);
dn.metrics.addBlockReport(brSendCost);
LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ " blocks took " + brCreateCost + " msec to generate and "
+ brSendCost + " msecs for RPC and NN processing");
@ -1060,7 +959,7 @@ DatanodeCommand blockReport() throws IOException {
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval));
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
@ -1070,7 +969,7 @@ DatanodeCommand blockReport() throws IOException {
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
lastBlockReport += (now() - lastBlockReport) /
blockReportInterval * blockReportInterval;
dnConf.blockReportInterval * dnConf.blockReportInterval;
}
LOG.info("sent block report, processed command:" + cmd);
}
@ -1080,12 +979,12 @@ DatanodeCommand blockReport() throws IOException {
DatanodeCommand [] sendHeartBeat() throws IOException {
return bpNamenode.sendHeartbeat(bpRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
data.getBlockPoolUsed(blockPoolId),
xmitsInProgress.get(),
getXceiverCount(), data.getNumFailedVolumes());
dn.data.getCapacity(),
dn.data.getDfsUsed(),
dn.data.getRemaining(),
dn.data.getBlockPoolUsed(blockPoolId),
dn.xmitsInProgress.get(),
dn.getXceiverCount(), dn.data.getNumFailedVolumes());
}
//This must be called only by blockPoolManager
@ -1121,21 +1020,9 @@ private synchronized void cleanUp() {
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
blockPoolManager.remove(this);
shouldServiceRun = false;
RPC.stopProxy(bpNamenode);
if (blockScanner != null) {
blockScanner.removeBlockPool(this.getBlockPoolId());
}
if (data != null) {
data.shutdownBlockPool(this.getBlockPoolId());
}
if (storage != null) {
storage.removeBlockPoolStorage(this.getBlockPoolId());
}
dn.shutdownBlockPool(this);
}
/**
@ -1144,22 +1031,22 @@ private synchronized void cleanUp() {
*/
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+ deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ blockReportInterval + "msec" + " Initial delay: "
+ initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ heartBeatInterval);
+ dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ dnConf.blockReportInterval + "msec" + " Initial delay: "
+ dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ dnConf.heartBeatInterval);
//
// Now loop for a long time....
//
while (shouldRun && shouldServiceRun) {
while (dn.shouldRun && shouldServiceRun) {
try {
long startTime = now();
//
// Every so often, send heartbeat or block-report
//
if (startTime - lastHeartbeat > heartBeatInterval) {
if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
@ -1168,9 +1055,9 @@ private void offerService() throws Exception {
// -- Bytes remaining
//
lastHeartbeat = startTime;
if (!heartbeatsDisabledForTests) {
if (!dn.heartbeatsDisabledForTests) {
DatanodeCommand[] cmds = sendHeartBeat();
metrics.addHeartbeat(now() - startTime);
dn.metrics.addHeartbeat(now() - startTime);
long startProcessCommands = now();
if (!processCommand(cmds))
@ -1183,7 +1070,7 @@ private void offerService() throws Exception {
}
}
if (pendingReceivedRequests > 0
|| (startTime - lastDeletedReport > deleteReportInterval)) {
|| (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
}
@ -1192,15 +1079,15 @@ private void offerService() throws Exception {
processCommand(cmd);
// Now safe to start scanning the block pool
if (blockScanner != null) {
blockScanner.addBlockPool(this.blockPoolId);
if (dn.blockScanner != null) {
dn.blockScanner.addBlockPool(this.blockPoolId);
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval -
long waitTime = dnConf.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedAndDeletedBlockList) {
if (waitTime > 0 && pendingReceivedRequests == 0) {
@ -1223,7 +1110,7 @@ private void offerService() throws Exception {
}
LOG.warn("RemoteException in offerService", re);
try {
long sleepTime = Math.min(1000, heartBeatInterval);
long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@ -1269,7 +1156,7 @@ void register() throws IOException {
(bpNSInfo.getLayoutVersion(), "namenode");
}
while(shouldRun && shouldServiceRun) {
while(dn.shouldRun && shouldServiceRun) {
try {
// Use returned registration from namenode with updated machine name.
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
@ -1277,8 +1164,6 @@ void register() throws IOException {
LOG.info("bpReg after =" + bpRegistration.storageInfo +
";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
NetUtils.getHostname();
hostName = bpRegistration.getHost();
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
@ -1287,47 +1172,13 @@ void register() throws IOException {
} catch (InterruptedException ie) {}
}
}
if (storage.getStorageID().equals("")) {
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
if (!isBlockTokenInitialized) {
/* first time registering with NN */
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
if (isBlockTokenEnabled) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
isBlockTokenInitialized = true;
}
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
dn.bpRegistrationSucceeded(bpRegistration, blockPoolId);
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(initialBlockReportDelay);
scheduleBlockReport(dnConf.initialBlockReportDelay);
}
@ -1341,14 +1192,14 @@ void register() throws IOException {
*/
@Override
public void run() {
LOG.info(bpRegistration + "In BPOfferService.run, data = " + data
LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data
+ ";bp=" + blockPoolId);
try {
// init stuff
try {
// setup storage
setupBP(conf, dataDirs);
setupBP(dn.conf);
register();
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
@ -1360,13 +1211,13 @@ public void run() {
initialized = true; // bp is initialized;
while (shouldRun && shouldServiceRun) {
while (dn.shouldRun && shouldServiceRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService", ex);
if (shouldRun && shouldServiceRun) {
if (dn.shouldRun && shouldServiceRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
@ -1379,7 +1230,7 @@ public void run() {
LOG.warn("Unexpected exception", ex);
} finally {
LOG.warn(bpRegistration + " ending block pool service for: "
+ blockPoolId);
+ blockPoolId + " thread " + Thread.currentThread().getId());
cleanUp();
}
}
@ -1420,8 +1271,8 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
metrics.incrBlocksReplicated(bcmd.getBlocks().length);
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
//
@ -1430,16 +1281,16 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
//
Block toDelete[] = bcmd.getBlocks();
try {
if (blockScanner != null) {
blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
if (dn.blockScanner != null) {
dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
}
// using global fsdataset
data.invalidate(bcmd.getBlockPoolId(), toDelete);
dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
checkDiskError();
dn.checkDiskError();
throw e;
}
metrics.incrBlocksRemoved(toDelete.length);
dn.metrics.incrBlocksRemoved(toDelete.length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// shut down the data node
@ -1448,12 +1299,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact
LOG.info("DatanodeCommand action: DNA_REGISTER");
if (shouldRun && shouldServiceRun) {
if (dn.shouldRun && shouldServiceRun) {
register();
}
break;
case DatanodeProtocol.DNA_FINALIZE:
storage.finalizeUpgrade(((FinalizeCommand) cmd)
dn.storage.finalizeUpgrade(((FinalizeCommand) cmd)
.getBlockPoolId());
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
@ -1461,12 +1312,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.setKeys(blockPoolId,
((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
@ -1476,7 +1327,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
if (bandwidth > 0) {
DataXceiverServer dxcs =
(DataXceiverServer) dataXceiverServer.getRunnable();
(DataXceiverServer) dn.dataXceiverServer.getRunnable();
dxcs.balanceThrottler.setBandwidth(bandwidth);
}
break;
@ -1495,7 +1346,7 @@ private void processDistributedUpgradeCommand(UpgradeCommand comm)
synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null)
upgradeManager =
new UpgradeManagerDatanode(DataNode.this, blockPoolId);
new UpgradeManagerDatanode(dn, blockPoolId);
return upgradeManager;
}
@ -1537,11 +1388,11 @@ void startDataNode(Configuration conf,
this.secureResources = resources;
this.dataDirs = dataDirs;
this.conf = conf;
this.dnConf = new DNConf(conf);
storage = new DataStorage();
// global DN settings
initConfig(conf);
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
@ -1555,6 +1406,133 @@ void startDataNode(Configuration conf,
blockPoolManager = new BlockPoolManager(conf);
}
/**
* Check that the registration returned from a NameNode is consistent
* with the information in the storage. If the storage is fresh/unformatted,
* sets the storage ID based on this registration.
* Also updates the block pool's state in the secret manager.
*/
private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
String blockPoolId)
throws IOException {
hostName = bpRegistration.getHost();
if (storage.getStorageID().equals("")) {
// This is a fresh datanode -- take the storage ID provided by the
// NN and persist it.
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
}
/**
* After the block pool has contacted the NN, registers that block pool
* with the secret manager, updating it with the secrets provided by the NN.
* @param bpRegistration
* @param blockPoolId
* @throws IOException
*/
private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration,
String blockPoolId) throws IOException {
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
// TODO should we check that all federated nns are either enabled or
// disabled?
if (!isBlockTokenEnabled) return;
if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
/**
* Remove the given block pool from the block scanner, dataset, and storage.
*/
private void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos);
String bpId = bpos.getBlockPoolId();
if (blockScanner != null) {
blockScanner.removeBlockPool(bpId);
}
if (data != null) {
data.shutdownBlockPool(bpId);
}
if (storage != null) {
storage.removeBlockPoolStorage(bpId);
}
}
void initBlockPool(BPOfferService bpOfferService,
NamespaceInfo nsInfo) throws IOException {
String blockPoolId = nsInfo.getBlockPoolID();
blockPoolManager.addBlockPool(bpOfferService);
synchronized (this) {
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setClusterId(nsInfo.clusterID);
}
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (!simulatedFSDataset) {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
dataDirs, startOpt);
StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
LOG.info("setting up storage: nsid=" +
bpStorage.getNamespaceID() + ";bpid="
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo);
}
initFsDataSet();
initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
}
private DatanodeRegistration createRegistration() {
DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
reg.setInfoPort(infoServer.getPort());
reg.setIpcPort(getIpcPort());
return reg;
}
BPOfferService[] getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads();
}
@ -1567,8 +1545,7 @@ int getBpOsCount() {
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
private synchronized void initFsDataSet(Configuration conf,
AbstractList<File> dataDirs) throws IOException {
private synchronized void initFsDataSet() throws IOException {
if (data != null) { // Already initialized
return;
}
@ -1663,7 +1640,7 @@ DatanodeRegistration getDNRegistrationByMachineName(String mName) {
* Creates either NIO or regular depending on socketWriteTimeout.
*/
protected Socket newSocket() throws IOException {
return (socketWriteTimeout > 0) ?
return (dnConf.socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
}
@ -2088,10 +2065,10 @@ public void run() {
InetSocketAddress curTarget =
NetUtils.createSocketAddr(targets[0].getName());
sock = newSocket();
NetUtils.connect(sock, curTarget, socketTimeout);
sock.setSoTimeout(targets.length * socketTimeout);
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
long writeTimeout = socketWriteTimeout +
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
out = new DataOutputStream(new BufferedOutputStream(baseStream,
@ -2534,7 +2511,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
DatanodeRegistration bpReg = bpos.bpRegistration;
InterDatanodeProtocol datanode = bpReg.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
socketTimeout);
dnConf.socketTimeout);
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() &&
@ -2923,20 +2900,8 @@ public Long getBalancerBandwidth() {
(DataXceiverServer) this.dataXceiverServer.getRunnable();
return dxcs.balanceThrottler.getBandwidth();
}
long getReadaheadLength() {
return readaheadLength;
}
boolean shouldDropCacheBehindWrites() {
return dropCacheBehindWrites;
}
boolean shouldDropCacheBehindReads() {
return dropCacheBehindReads;
}
boolean shouldSyncBehindWrites() {
return syncBehindWrites;
DNConf getDnConf() {
return dnConf;
}
}

View File

@ -38,7 +38,6 @@
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -82,9 +81,9 @@ class DataXceiver extends Receiver implements Runnable {
private final String remoteAddress; // address of remote side
private final String localAddress; // local address of this daemon
private final DataNode datanode;
private final DNConf dnConf;
private final DataXceiverServer dataXceiverServer;
private int socketKeepaliveTimeout;
private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
@ -95,14 +94,11 @@ public DataXceiver(Socket s, DataNode datanode,
this.s = s;
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dnConf = datanode.getDnConf();
this.dataXceiverServer = dataXceiverServer;
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
socketKeepaliveTimeout = datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
@ -144,8 +140,8 @@ public void run() {
try {
if (opsProcessed != 0) {
assert socketKeepaliveTimeout > 0;
s.setSoTimeout(socketKeepaliveTimeout);
assert dnConf.socketKeepaliveTimeout > 0;
s.setSoTimeout(dnConf.socketKeepaliveTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
@ -180,7 +176,7 @@ public void run() {
opStartTime = now();
processOp(op);
++opsProcessed;
} while (!s.isClosed() && socketKeepaliveTimeout > 0);
} while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
} catch (Throwable t) {
LOG.error(datanode.getMachineName() + ":DataXceiver error processing " +
((op == null) ? "unknown" : op.name()) + " operation " +
@ -205,7 +201,7 @@ public void readBlock(final ExtendedBlock block,
final long blockOffset,
final long length) throws IOException {
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
dnConf.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(out, true, block, blockToken,
@ -231,13 +227,13 @@ public void readBlock(final ExtendedBlock block,
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
throw e;
}
// send op status
writeSuccessWithChecksumInfo(blockSender,
getStreamWithTimeout(s, datanode.socketWriteTimeout));
getStreamWithTimeout(s, dnConf.socketWriteTimeout));
long read = blockSender.sendBlock(out, baseStream, null); // send data
@ -335,7 +331,7 @@ public void writeBlock(final ExtendedBlock block,
// reply to upstream datanode or client
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
@ -370,9 +366,9 @@ public void writeBlock(final ExtendedBlock block,
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = datanode.socketTimeout
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = datanode.socketWriteTimeout +
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
@ -508,7 +504,7 @@ public void transferBlock(final ExtendedBlock blk,
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, null, out);
@ -521,7 +517,7 @@ public void transferBlock(final ExtendedBlock blk,
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
@ -581,7 +577,7 @@ public void copyBlock(final ExtendedBlock block,
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout);
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
return;
}
@ -591,7 +587,7 @@ public void copyBlock(final ExtendedBlock block,
String msg = "Not able to copy block " + block.getBlockId() + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.info(msg);
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
return;
}
@ -606,7 +602,7 @@ public void copyBlock(final ExtendedBlock block,
// set up response stream
OutputStream baseStream = NetUtils.getOutputStream(
s, datanode.socketWriteTimeout);
s, dnConf.socketWriteTimeout);
reply = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
@ -659,7 +655,7 @@ public void replaceBlock(final ExtendedBlock block,
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
datanode.socketWriteTimeout);
dnConf.socketWriteTimeout);
return;
}
}
@ -668,7 +664,7 @@ public void replaceBlock(final ExtendedBlock block,
String msg = "Not able to receive block " + block.getBlockId() + " from "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.warn(msg);
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
return;
}
@ -684,11 +680,11 @@ public void replaceBlock(final ExtendedBlock block,
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
proxySource.getName());
proxySock = datanode.newSocket();
NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
proxySock.setSoTimeout(datanode.socketTimeout);
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
proxySock.setSoTimeout(dnConf.socketTimeout);
OutputStream baseStream = NetUtils.getOutputStream(proxySock,
datanode.socketWriteTimeout);
dnConf.socketWriteTimeout);
proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE));
@ -750,7 +746,7 @@ public void replaceBlock(final ExtendedBlock block,
// send response back
try {
sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
@ -826,7 +822,7 @@ private void checkAccess(DataOutputStream out, final boolean reply,
if (reply) {
if (out == null) {
out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
}
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()

View File

@ -67,7 +67,7 @@ check_privsep_dir() {
}
export PATH="${PATH:+$PATH:}/usr/sbin:/usr/bin"
export HADOOP_PREFIX="/usr"
export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr}
case "$1" in
start)

View File

@ -27,7 +27,7 @@ source /etc/default/hadoop-env.sh
RETVAL=0
PIDFILE="${HADOOP_PID_DIR}/hadoop-hdfs-secondarynamenode.pid"
desc="Hadoop secondary namenode daemon"
export HADOOP_PREFIX="/usr"
export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr}
start() {
echo -n $"Starting $desc (hadoop-secondarynamenode): "

View File

@ -61,7 +61,7 @@ public static void setBPNamenodeByIndex(DataNode dn,
bpos.setNamespaceInfo(nsifno);
dn.setBPNamenode(bpid, nn);
bpos.setupBPStorage();
dn.initBlockPool(bpos, nsifno);
}
}
}

View File

@ -154,7 +154,7 @@ public void testBlockMetaDataInfo() throws Exception {
//connect to a data node
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
datanodeinfo[0], conf, datanode.socketTimeout);
datanodeinfo[0], conf, datanode.getDnConf().socketTimeout);
assertTrue(datanode != null);
//stop block scanner, so we could compare lastScanTime

View File

@ -34,6 +34,12 @@ Trunk (unreleased changes)
MAPREDUCE-3149. Add a test to verify that TokenCache handles file system
uri with no authority. (John George via jitendra)
MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides
client APIs cross MR1 and MR2. (Ahmed via tucu)
MAPREDUCE-3415. improve MiniMRYarnCluster & DistributedShell JAR resolution.
(tucu)
BUG FIXES
MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null.
(amarrk)
@ -94,6 +100,12 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor
has wrong configuration or permissions. (Hitesh Shah via vinodkv)
MAPREDUCE-3373. Hadoop scripts unconditionally source
"$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite)
MAPREDUCE-3372. HADOOP_PREFIX cannot be overridden.
(Bruno Mahé via tomwhite)
OPTIMIZATIONS
BUG FIXES

View File

@ -19,8 +19,10 @@ bin=`which $0`
bin=`dirname ${bin}`
bin=`cd "$bin"; pwd`
if [ -e $bin/../libexec/mapred-config.sh ]; then
. $bin/../libexec/mapred-config.sh
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e ${HADOOP_LIBEXEC_DIR}/mapred-config.sh ]; then
. ${HADOOP_LIBEXEC_DIR}/mapred-config.sh
else
. "$bin/mapred-config.sh"
fi

View File

@ -22,8 +22,10 @@ bin=`which "$0"`
bin=`dirname "${bin}"`
bin=`cd "$bin"; pwd`
if [ -e "$bin/../libexec/hadoop-config.sh" ]; then
. "$bin/../libexec/hadoop-config.sh"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then
. "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
elif [ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]; then
. "$HADOOP_COMMON_HOME"/libexec/hadoop-config.sh
elif [ -e "${HADOOP_COMMON_HOME}/bin/hadoop-config.sh" ]; then

View File

@ -101,16 +101,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<yarn.mr.jar>${project.parent.basedir}/hadoop-mapreduce-client-app/target/hadoop-mapreduce-client-app-${project.version}.jar</yarn.mr.jar>
</systemPropertyVariables>
<environmentVariables>
<JAVA_HOME>${java.home}</JAVA_HOME>
</environmentVariables>
<additionalClasspathElements>
<!-- workaround for JobConf#setJarByClass -->
<additionalClasspathElement>${project.build.directory}/${project.artifactId}-${project.version}-tests.jar</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
</plugin>
</plugins>

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
/*
* A simple interface for a client MR cluster used for testing. This interface
* provides basic methods which are independent of the underlying Mini Cluster (
* either through MR1 or MR2).
*/
public interface MiniMRClientCluster {
public void start() throws IOException;
public void stop() throws IOException;
public Configuration getConfig() throws IOException;
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.util.JarFinder;
/**
* A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
* interface around the MiniMRYarnCluster. While in MR1, it provides such
* wrapper around MiniMRCluster. This factory should be used in tests to provide
* an easy migration of tests across MR1 and MR2.
*/
public class MiniMRClientClusterFactory {
public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
Configuration conf) throws IOException {
if (conf == null) {
conf = new Configuration();
}
FileSystem fs = FileSystem.get(conf);
Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
.makeQualified(fs);
Path appJar = new Path(testRootDir, "MRAppJar.jar");
// Copy MRAppJar and make it private.
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appJar);
fs.setPermission(appJar, new FsPermission("700"));
Job job = Job.getInstance(conf);
job.addFileToClassPath(appJar);
String callerJar = JarFinder.getJar(caller);
job.setJar(callerJar);
MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
.getName(), noOfNMs);
miniMRYarnCluster.init(job.getConfiguration());
miniMRYarnCluster.start();
return new MiniMRYarnClusterAdapter(miniMRYarnCluster);
}
}

View File

@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
/**
* This class is an MR2 replacement for older MR1 MiniMRCluster, that was used
* by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster
* in MR2 but provides the same old MR1 interface, so tests can be migrated from
* MR1 to MR2 with minimal changes.
*
* Due to major differences between MR1 and MR2, a number of methods are either
* unimplemented/unsupported or were re-implemented to provide wrappers around
* MR2 functionality.
*/
public class MiniMRCluster {
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
private MiniMRClientCluster mrClientCluster;
public String getTaskTrackerLocalDir(int taskTracker) {
throw new UnsupportedOperationException();
}
public String[] getTaskTrackerLocalDirs(int taskTracker) {
throw new UnsupportedOperationException();
}
class JobTrackerRunner {
// Mock class
}
class TaskTrackerRunner {
// Mock class
}
public JobTrackerRunner getJobTrackerRunner() {
throw new UnsupportedOperationException();
}
TaskTrackerRunner getTaskTrackerRunner(int id) {
throw new UnsupportedOperationException();
}
public int getNumTaskTrackers() {
throw new UnsupportedOperationException();
}
public void setInlineCleanupThreads() {
throw new UnsupportedOperationException();
}
public void waitUntilIdle() {
throw new UnsupportedOperationException();
}
private void waitTaskTrackers() {
throw new UnsupportedOperationException();
}
public int getJobTrackerPort() {
throw new UnsupportedOperationException();
}
public JobConf createJobConf() {
JobConf jobConf = null;
try {
jobConf = new JobConf(mrClientCluster.getConfig());
} catch (IOException e) {
LOG.error(e);
}
return jobConf;
}
public JobConf createJobConf(JobConf conf) {
JobConf jobConf = null;
try {
jobConf = new JobConf(mrClientCluster.getConfig());
} catch (IOException e) {
LOG.error(e);
}
return jobConf;
}
static JobConf configureJobConf(JobConf conf, String namenode,
int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) {
throw new UnsupportedOperationException();
}
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
String[] racks, String[] hosts) throws IOException {
this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
}
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
String[] racks, String[] hosts, JobConf conf) throws IOException {
this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
}
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
throws IOException {
this(0, 0, numTaskTrackers, namenode, numDir);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks)
throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, ugi, null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi, JobConf conf)
throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, ugi, conf, 0);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi, JobConf conf,
int numTrackerToExclude) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, ugi, conf, numTrackerToExclude, new Clock());
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi, JobConf conf,
int numTrackerToExclude, Clock clock) throws IOException {
if (conf == null) conf = new JobConf();
FileSystem.setDefaultUri(conf, namenode);
mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
numTaskTrackers, conf);
}
public UserGroupInformation getUgi() {
throw new UnsupportedOperationException();
}
public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
int max) throws IOException {
throw new UnsupportedOperationException();
}
public void setJobPriority(JobID jobId, JobPriority priority)
throws AccessControlException, IOException {
throw new UnsupportedOperationException();
}
public JobPriority getJobPriority(JobID jobId) {
throw new UnsupportedOperationException();
}
public long getJobFinishTime(JobID jobId) {
throw new UnsupportedOperationException();
}
public void initializeJob(JobID jobId) throws IOException {
throw new UnsupportedOperationException();
}
public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates(
int index, JobID jobId, int max) throws IOException {
throw new UnsupportedOperationException();
}
public JobConf getJobTrackerConf() {
JobConf jobConf = null;
try {
jobConf = new JobConf(mrClientCluster.getConfig());
} catch (IOException e) {
LOG.error(e);
}
return jobConf;
}
public int getFaultCount(String hostName) {
throw new UnsupportedOperationException();
}
public void startJobTracker() {
// Do nothing
}
public void startJobTracker(boolean wait) {
// Do nothing
}
public void stopJobTracker() {
// Do nothing
}
public void stopTaskTracker(int id) {
// Do nothing
}
public void startTaskTracker(String host, String rack, int idx, int numDir)
throws IOException {
// Do nothing
}
void addTaskTracker(TaskTrackerRunner taskTracker) {
throw new UnsupportedOperationException();
}
int getTaskTrackerID(String trackerName) {
throw new UnsupportedOperationException();
}
public void shutdown() {
try {
mrClientCluster.stop();
} catch (IOException e) {
LOG.error(e);
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
/**
* An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
* This interface could be used by tests across both MR1 and MR2.
*/
public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
private MiniMRYarnCluster miniMRYarnCluster;
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
this.miniMRYarnCluster = miniMRYarnCluster;
}
@Override
public Configuration getConfig() {
return miniMRYarnCluster.getConfig();
}
@Override
public void start() {
miniMRYarnCluster.start();
}
@Override
public void stop() {
miniMRYarnCluster.stop();
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic testing for the MiniMRClientCluster. This test shows an example class
* that can be used in MR1 or MR2, without any change to the test. The test will
* use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1.
*/
public class TestMiniMRClientCluster {
private static Path inDir = null;
private static Path outDir = null;
private static Path testdir = null;
private static Path[] inFiles = new Path[5];
private static MiniMRClientCluster mrCluster;
@BeforeClass
public static void setup() throws IOException {
final Configuration conf = new Configuration();
final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
"/tmp"));
testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster");
inDir = new Path(testdir, "in");
outDir = new Path(testdir, "out");
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(testdir) && !fs.delete(testdir, true)) {
throw new IOException("Could not delete " + testdir);
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir);
}
for (int i = 0; i < inFiles.length; i++) {
inFiles[i] = new Path(inDir, "part_" + i);
createFile(inFiles[i], conf);
}
// create the mini cluster to be used for the tests
mrCluster = MiniMRClientClusterFactory.create(
TestMiniMRClientCluster.class, 1, new Configuration());
}
@AfterClass
public static void cleanup() throws IOException {
// clean up the input and output files
final Configuration conf = new Configuration();
final FileSystem fs = testdir.getFileSystem(conf);
if (fs.exists(testdir)) {
fs.delete(testdir, true);
}
// stopping the mini cluster
mrCluster.stop();
}
@Test
public void testJob() throws Exception {
final Job job = createJob();
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
inDir);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
new Path(outDir, "testJob"));
assertTrue(job.waitForCompletion(true));
validateCounters(job.getCounters(), 5, 25, 5, 5);
}
private void validateCounters(Counters counters, long mapInputRecords,
long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(
"MyCounterGroup", "MAP_INPUT_RECORDS").getValue());
assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter(
"MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue());
assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter(
"MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue());
assertEquals("ReduceOutputRecords", reduceOutputRecords, counters
.findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue());
}
private static void createFile(Path inFile, Configuration conf)
throws IOException {
final FileSystem fs = inFile.getFileSystem(conf);
if (fs.exists(inFile)) {
return;
}
FSDataOutputStream out = fs.create(inFile);
out.writeBytes("This is a test file");
out.close();
}
public static Job createJob() throws IOException {
final Job baseJob = new Job(mrCluster.getConfig());
baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(MyMapper.class);
baseJob.setReducerClass(MyReducer.class);
baseJob.setNumReduceTasks(1);
return baseJob;
}
public static class MyMapper extends
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1);
StringTokenizer iter = new StringTokenizer(value.toString());
while (iter.hasMoreTokens()) {
word.set(iter.nextToken());
context.write(word, one);
context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1);
}
}
}
public static class MyReducer extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1);
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS")
.increment(1);
}
}
}

View File

@ -24,13 +24,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -45,8 +45,7 @@
*/
public class MiniMRYarnCluster extends MiniYARNCluster {
public static final String APPJAR = System.getProperty("yarn.mr.jar", JobConf
.findContainingJar(LocalContainerLauncher.class));
public static final String APPJAR = JarFinder.getJar(LocalContainerLauncher.class);
private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
private JobHistoryServer historyServer;
@ -55,7 +54,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
public MiniMRYarnCluster(String testName) {
this(testName, 1);
}
public MiniMRYarnCluster(String testName, int noOfNMs) {
super(testName, noOfNMs);
//TODO: add the history server
@ -88,9 +87,9 @@ public void init(Configuration conf) {
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// Set config for JH Server
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS,
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
super.init(conf);
}

View File

@ -38,7 +38,9 @@ fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/yarn-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
# If the slaves file is specified in the command line,
# then it takes precedence over the definition in

View File

@ -23,7 +23,10 @@ echo "starting yarn daemons"
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/yarn-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
# start resourceManager
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start resourcemanager
# start nodeManager

View File

@ -23,7 +23,10 @@ echo "stopping yarn daemons"
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/yarn-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
# stop resourceManager
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop resourcemanager
# stop nodeManager

View File

@ -44,7 +44,9 @@
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/yarn-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
cygwin=false
case "`uname`" in

View File

@ -39,7 +39,9 @@ fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. "$bin"/yarn-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
# get arguments
startStop=$1

View File

@ -30,7 +30,9 @@ fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
. $bin/yarn-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
exec "$bin/slaves.sh" --config $YARN_CONF_DIR cd "$YARN_HOME" \; "$bin/yarn-daemon.sh" --config $YARN_CONF_DIR "$@"

View File

@ -82,7 +82,7 @@
<goals>
<goal>jar</goal>
</goals>
<!-- strictly speaking, the unit test is really a regression test. It
<!-- strictly speaking, the unit test is really a regression test. It
needs the main jar to be available to be able to run. -->
<phase>test-compile</phase>
</execution>
@ -109,9 +109,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<yarn.ds.jar>${project.build.directory}/${project.artifactId}-${project.version}.jar</yarn.ds.jar>
</systemPropertyVariables>
<environmentVariables>
<JAVA_HOME>${java.home}</JAVA_HOME>
</environmentVariables>

View File

@ -23,7 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -37,8 +37,7 @@ public class TestDistributedShell {
protected static MiniYARNCluster yarnCluster = null;
protected static Configuration conf = new Configuration();
protected static String APPMASTER_JAR = System.getProperty("yarn.ds.jar",
JobConf.findContainingJar(ApplicationMaster.class));
protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class);
@BeforeClass
public static void setup() throws InterruptedException, IOException {

View File

@ -24,7 +24,9 @@ params=$#
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# get arguments
if [ $# -ge 1 ]; then

View File

@ -25,7 +25,9 @@ params=$#
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# get arguments
if [ $# -ge 1 ]; then

View File

@ -24,7 +24,9 @@ params=$#
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# get arguments
if [ $# -ge 1 ]; then

View File

@ -25,7 +25,9 @@ params=$#
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hadoop-config.sh
DEFAULT_LIBEXEC_DIR="$bin"
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh
# get arguments
if [ $# -ge 1 ]; then

View File

@ -12,7 +12,7 @@
# limitations under the License.
export HADOOP_PREFIX=../../..
export HADOOP_PREFIX=${HADOOP_PREFIX:-../../..}
export CLASSPATH="$HADOOP_PREFIX/build/classes"

View File

@ -12,7 +12,7 @@
# limitations under the License.
export HADOOP_PREFIX=../../../../..
export HADOOP_PREFIX=${HADOOP_PREFIX:-../../../../..}
export CLASSPATH="$HADOOP_PREFIX/build/classes"
export CLASSPATH=${CLASSPATH}:"$HADOOP_PREFIX/build/contrib/abacus/classes"

View File

@ -67,7 +67,7 @@ check_privsep_dir() {
}
export PATH="${PATH:+$PATH:}/usr/sbin:/usr/bin"
export HADOOP_PREFIX="/usr"
export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr}
case "$1" in
start)

View File

@ -27,7 +27,7 @@ source /etc/default/hadoop-env.sh
RETVAL=0
PIDFILE="${HADOOP_PID_DIR}/hadoop-mapred-historyserver.pid"
desc="Hadoop historyserver daemon"
export HADOOP_PREFIX="/usr"
export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr}
start() {
echo -n $"Starting $desc (hadoop-historyserver): "

View File

@ -264,7 +264,7 @@
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<attach>false</attach>
<finalName>hadoop-dist-${project.version}-src</finalName>
<finalName>hadoop-${project.version}-src</finalName>
<outputDirectory>hadoop-dist/target</outputDirectory>
<!-- Not using descriptorRef and hadoop-assembly dependency -->
<!-- to avoid making hadoop-main to depend on a module -->
@ -288,7 +288,7 @@
<configuration>
<target>
<echo/>
<echo>Hadoop source tar available at: ${basedir}/hadoop-dist/target/hadoop-dist-${project.version}-src.tar.gz</echo>
<echo>Hadoop source tar available at: ${basedir}/hadoop-dist/target/hadoop-${project.version}-src.tar.gz</echo>
<echo/>
</target>
</configuration>