Merge trunk into auto-HA branch

Resolved some trivial conflicts in NNHAServiceTarget


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1309576 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-04-04 20:00:15 +00:00
commit 00a1548005
32 changed files with 665 additions and 221 deletions

View File

@ -245,6 +245,8 @@ Release 2.0.0 - UNRELEASED
HADOOP-8242. AbstractDelegationTokenIdentifier: add getter methods HADOOP-8242. AbstractDelegationTokenIdentifier: add getter methods
for owner and realuser. (Colin Patrick McCabe via eli) for owner and realuser. (Colin Patrick McCabe via eli)
HADOOP-8007. Use substitution tokens for fencing argument (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -381,6 +383,9 @@ Release 0.23.3 - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-8088. User-group mapping cache incorrectly does negative caching on
transient failures (Khiwal Lee via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -476,9 +481,6 @@ Release 0.23.2 - UNRELEASED
HADOOP-8176. Disambiguate the destination of FsShell copies (Daryn Sharp HADOOP-8176. Disambiguate the destination of FsShell copies (Daryn Sharp
via bobby) via bobby)
HADOOP-8088. User-group mapping cache incorrectly does negative caching on
transient failures (Khiwal Lee via bobby)
HADOOP-8208. Disallow self failover. (eli) HADOOP-8208. Disallow self failover. (eli)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17

View File

@ -19,6 +19,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -29,6 +30,8 @@
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import com.google.common.collect.Maps;
/** /**
* Represents a target of the client side HA administration commands. * Represents a target of the client side HA administration commands.
*/ */
@ -36,6 +39,10 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class HAServiceTarget { public abstract class HAServiceTarget {
private static final String HOST_SUBST_KEY = "host";
private static final String PORT_SUBST_KEY = "port";
private static final String ADDRESS_SUBST_KEY = "address";
/** /**
* @return the IPC address of the target node. * @return the IPC address of the target node.
*/ */
@ -68,4 +75,28 @@ public HAServiceProtocol getProxy(Configuration conf, int timeoutMs)
getAddress(), getAddress(),
confCopy, factory, timeoutMs); confCopy, factory, timeoutMs);
} }
public final Map<String, String> getFencingParameters() {
Map<String, String> ret = Maps.newHashMap();
addFencingParameters(ret);
return ret;
}
/**
* Hook to allow subclasses to add any parameters they would like to
* expose to fencing implementations/scripts. Fencing methods are free
* to use this map as they see fit -- notably, the shell script
* implementation takes each entry, prepends 'target_', substitutes
* '_' for '.', and adds it to the environment of the script.
*
* Subclass implementations should be sure to delegate to the superclass
* implementation as well as adding their own keys.
*
* @param ret map which can be mutated to pass parameters to the fencer
*/
protected void addFencingParameters(Map<String, String> ret) {
ret.put(ADDRESS_SUBST_KEY, String.valueOf(getAddress()));
ret.put(HOST_SUBST_KEY, getAddress().getHostName());
ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort()));
}
} }

View File

@ -19,16 +19,11 @@
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -61,6 +56,9 @@ public class ShellCommandFencer
/** Length at which to abbreviate command in long messages */ /** Length at which to abbreviate command in long messages */
private static final int ABBREV_LENGTH = 20; private static final int ABBREV_LENGTH = 20;
/** Prefix for target parameters added to the environment */
private static final String TARGET_PREFIX = "target_";
@VisibleForTesting @VisibleForTesting
static Log LOG = LogFactory.getLog( static Log LOG = LogFactory.getLog(
ShellCommandFencer.class); ShellCommandFencer.class);
@ -76,19 +74,10 @@ public void checkArgs(String args) throws BadFencingConfigurationException {
@Override @Override
public boolean tryFence(HAServiceTarget target, String cmd) { public boolean tryFence(HAServiceTarget target, String cmd) {
InetSocketAddress serviceAddr = target.getAddress();
List<String> cmdList = Arrays.asList(cmd.split("\\s+"));
// Create arg list with service as the first argument
List<String> argList = new ArrayList<String>();
argList.add(cmdList.get(0));
argList.add(serviceAddr.getHostName() + ":" + serviceAddr.getPort());
argList.addAll(cmdList.subList(1, cmdList.size()));
String cmdWithSvc = StringUtils.join(" ", argList);
ProcessBuilder builder = new ProcessBuilder( ProcessBuilder builder = new ProcessBuilder(
"bash", "-e", "-c", cmdWithSvc); "bash", "-e", "-c", cmd);
setConfAsEnvVars(builder.environment()); setConfAsEnvVars(builder.environment());
addTargetInfoAsEnvVars(target, builder.environment());
Process p; Process p;
try { try {
@ -185,4 +174,21 @@ private void setConfAsEnvVars(Map<String, String> env) {
env.put(pair.getKey().replace('.', '_'), pair.getValue()); env.put(pair.getKey().replace('.', '_'), pair.getValue());
} }
} }
/**
* Add information about the target to the the environment of the
* subprocess.
*
* @param target
* @param environment
*/
private void addTargetInfoAsEnvVars(HAServiceTarget target,
Map<String, String> environment) {
for (Map.Entry<String, String> e :
target.getFencingParameters().entrySet()) {
String key = TARGET_PREFIX + e.getKey();
key = key.replace('.', '_');
environment.put(key, e.getValue());
}
}
} }

View File

@ -103,7 +103,7 @@ public void testCheckParensNoArgs() {
public void testStdoutLogging() { public void testStdoutLogging() {
assertTrue(fencer.tryFence(TEST_TARGET, "echo hello")); assertTrue(fencer.tryFence(TEST_TARGET, "echo hello"));
Mockito.verify(ShellCommandFencer.LOG).info( Mockito.verify(ShellCommandFencer.LOG).info(
Mockito.endsWith("echo hello: host:1234 hello")); Mockito.endsWith("echo hello: hello"));
} }
/** /**
@ -114,7 +114,7 @@ public void testStdoutLogging() {
public void testStderrLogging() { public void testStderrLogging() {
assertTrue(fencer.tryFence(TEST_TARGET, "echo hello >&2")); assertTrue(fencer.tryFence(TEST_TARGET, "echo hello >&2"));
Mockito.verify(ShellCommandFencer.LOG).warn( Mockito.verify(ShellCommandFencer.LOG).warn(
Mockito.endsWith("echo hello >&2: host:1234 hello")); Mockito.endsWith("echo hello >&2: hello"));
} }
/** /**
@ -125,9 +125,21 @@ public void testStderrLogging() {
public void testConfAsEnvironment() { public void testConfAsEnvironment() {
fencer.tryFence(TEST_TARGET, "echo $in_fencing_tests"); fencer.tryFence(TEST_TARGET, "echo $in_fencing_tests");
Mockito.verify(ShellCommandFencer.LOG).info( Mockito.verify(ShellCommandFencer.LOG).info(
Mockito.endsWith("echo $in...ing_tests: host:1234 yessir")); Mockito.endsWith("echo $in...ing_tests: yessir"));
} }
/**
* Verify that information about the fencing target gets passed as
* environment variables to the fencer.
*/
@Test
public void testTargetAsEnvironment() {
fencer.tryFence(TEST_TARGET, "echo $target_host $target_port $target_address");
Mockito.verify(ShellCommandFencer.LOG).info(
Mockito.endsWith("echo $ta...t_address: host 1234 host:1234"));
}
/** /**
* Test that we properly close off our input to the subprocess * Test that we properly close off our input to the subprocess
* such that it knows there's no tty connected. This is important * such that it knows there's no tty connected. This is important

View File

@ -22,9 +22,6 @@ Trunk (unreleased changes)
HDFS-2572. Remove unnecessary double-check in DN#getHostName. (harsh) HDFS-2572. Remove unnecessary double-check in DN#getHostName. (harsh)
HDFS-2564. Cleanup unnecessary exceptions thrown and unnecessary casts.
(Hari Mankude via eli)
HDFS-2857. Cleanup BlockInfo class. (suresh) HDFS-2857. Cleanup BlockInfo class. (suresh)
HDFS-2786. Fix host-based token incompatibilities in DFSUtil. (Kihwal Lee HDFS-2786. Fix host-based token incompatibilities in DFSUtil. (Kihwal Lee
@ -185,6 +182,8 @@ Release 2.0.0 - UNRELEASED
HDFS-3148. The client should be able to use multiple local interfaces HDFS-3148. The client should be able to use multiple local interfaces
for data transfer. (eli) for data transfer. (eli)
HDFS-3000. Add a public API for setting quotas. (atm)
IMPROVEMENTS IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place. HDFS-2018. Move all journal stream management code into one place.
@ -306,6 +305,17 @@ Release 2.0.0 - UNRELEASED
HDFS-3120. Enable hsync and hflush by default. (eli) HDFS-3120. Enable hsync and hflush by default. (eli)
HDFS-3187. Upgrade guava to 11.0.2 (todd)
HDFS-3168. Remove unnecessary "throw IOException" and change fields to
final in FSNamesystem and BlockManager. (szetszwo)
HDFS-2564. Cleanup unnecessary exceptions thrown and unnecessary casts.
(Hari Mankude via eli)
HDFS-3084. FenceMethod.tryFence() and ShellCommandFencer should pass
namenodeId as well as host:port (todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd) HDFS-3024. Improve performance of stringification in addStoredBlock (todd)

View File

@ -1026,13 +1026,7 @@ public static String getNamenodeServiceAddr(final Configuration conf,
String nsId, String nnId) { String nsId, String nnId) {
if (nsId == null) { if (nsId == null) {
Collection<String> nsIds = getNameServiceIds(conf); nsId = getOnlyNameServiceIdOrNull(conf);
if (1 == nsIds.size()) {
nsId = nsIds.toArray(new String[1])[0];
} else {
// No nameservice ID was given and more than one is configured
return null;
}
} }
String serviceAddrKey = concatSuffixes( String serviceAddrKey = concatSuffixes(
@ -1047,4 +1041,18 @@ public static String getNamenodeServiceAddr(final Configuration conf,
} }
return serviceRpcAddr; return serviceRpcAddr;
} }
/**
* If the configuration refers to only a single nameservice, return the
* name of that nameservice. If it refers to 0 or more than 1, return null.
*/
public static String getOnlyNameServiceIdOrNull(Configuration conf) {
Collection<String> nsIds = getNameServiceIds(conf);
if (1 == nsIds.size()) {
return nsIds.toArray(new String[1])[0];
} else {
// No nameservice ID was given and more than one is configured
return null;
}
}
} }

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
/**
* The public API for performing administrative functions on HDFS. Those writing
* applications against HDFS should prefer this interface to directly accessing
* functionality in DistributedFileSystem or DFSClient.
*
* Note that this is distinct from the similarly-named {@link DFSAdmin}, which
* is a class that provides the functionality for the CLI `hdfs dfsadmin ...'
* commands.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HdfsAdmin {
private DistributedFileSystem dfs;
/**
* Create a new HdfsAdmin client.
*
* @param uri the unique URI of the HDFS file system to administer
* @param conf configuration
* @throws IOException in the event the file system could not be created
*/
public HdfsAdmin(URI uri, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(uri, conf);
if (!(fs instanceof DistributedFileSystem)) {
throw new IllegalArgumentException("'" + uri + "' is not an HDFS URI.");
} else {
dfs = (DistributedFileSystem)fs;
}
}
/**
* Set the namespace quota (count of files, directories, and sym links) for a
* directory.
*
* @param src the path to set the quota for
* @param quota the value to set for the quota
* @throws IOException in the event of error
*/
public void setQuota(Path src, long quota) throws IOException {
dfs.setQuota(src, quota, HdfsConstants.QUOTA_DONT_SET);
}
/**
* Clear the namespace quota (count of files, directories and sym links) for a
* directory.
*
* @param src the path to clear the quota of
* @throws IOException in the event of error
*/
public void clearQuota(Path src) throws IOException {
dfs.setQuota(src, HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET);
}
/**
* Set the disk space quota (size of files) for a directory. Note that
* directories and sym links do not occupy disk space.
*
* @param src the path to set the space quota of
* @param spaceQuota the value to set for the space quota
* @throws IOException in the event of error
*/
public void setSpaceQuota(Path src, long spaceQuota) throws IOException {
dfs.setQuota(src, HdfsConstants.QUOTA_DONT_SET, spaceQuota);
}
/**
* Clear the disk space quota (size of files) for a directory. Note that
* directories and sym links do not occupy disk space.
*
* @param src the path to clear the space quota of
* @throws IOException in the event of error
*/
public void clearSpaceQuota(Path src) throws IOException {
dfs.setQuota(src, HdfsConstants.QUOTA_DONT_SET, HdfsConstants.QUOTA_RESET);
}
}

View File

@ -253,9 +253,9 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
this.replicationRecheckInterval = this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
LOG.info("defaultReplication = " + defaultReplication); LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication); LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication); LOG.info("minReplication = " + minReplication);
LOG.info("maxReplicationStreams = " + maxReplicationStreams); LOG.info("maxReplicationStreams = " + maxReplicationStreams);
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval); LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
@ -1030,7 +1030,7 @@ int computeInvalidateWork(int nodesToProcess) {
* *
* @return number of blocks scheduled for replication during this iteration. * @return number of blocks scheduled for replication during this iteration.
*/ */
int computeReplicationWork(int blocksToProcess) throws IOException { int computeReplicationWork(int blocksToProcess) {
List<List<Block>> blocksToReplicate = null; List<List<Block>> blocksToReplicate = null;
namesystem.writeLock(); namesystem.writeLock();
try { try {
@ -2174,7 +2174,7 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
/** Set replication for the blocks. */ /** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl, public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) throws IOException { final String src, final Block... blocks) {
if (newRepl == oldRepl) { if (newRepl == oldRepl) {
return; return;
} }
@ -2937,8 +2937,6 @@ public void run() {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("ReplicationMonitor thread received InterruptedException.", ie); LOG.warn("ReplicationMonitor thread received InterruptedException.", ie);
break; break;
} catch (IOException ie) {
LOG.warn("ReplicationMonitor thread received exception. " , ie);
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("ReplicationMonitor thread received Runtime exception. ", t); LOG.warn("ReplicationMonitor thread received Runtime exception. ", t);
Runtime.getRuntime().exit(-1); Runtime.getRuntime().exit(-1);
@ -2956,14 +2954,14 @@ public void run() {
* @return number of blocks scheduled for replication or removal. * @return number of blocks scheduled for replication or removal.
* @throws IOException * @throws IOException
*/ */
int computeDatanodeWork() throws IOException { int computeDatanodeWork() {
int workFound = 0;
// Blocks should not be replicated or removed if in safe mode. // Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst // It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get // case extra replications will be scheduled, and these will get
// fixed up later. // fixed up later.
if (namesystem.isInSafeMode()) if (namesystem.isInSafeMode()) {
return workFound; return 0;
}
final int numlive = heartbeatManager.getLiveDatanodeCount(); final int numlive = heartbeatManager.getLiveDatanodeCount();
final int blocksToProcess = numlive final int blocksToProcess = numlive
@ -2971,7 +2969,7 @@ int computeDatanodeWork() throws IOException {
final int nodesToProcess = (int) Math.ceil(numlive final int nodesToProcess = (int) Math.ceil(numlive
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0); * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
workFound = this.computeReplicationWork(blocksToProcess); int workFound = this.computeReplicationWork(blocksToProcess);
// Update counters // Update counters
namesystem.writeLock(); namesystem.writeLock();

View File

@ -25,15 +25,17 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
@ -49,15 +51,13 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
@ -150,9 +150,9 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@ -260,30 +260,28 @@ private static final void logAuditEvent(UserGroupInformation ugi,
static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100; static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
static int BLOCK_DELETION_INCREMENT = 1000; static int BLOCK_DELETION_INCREMENT = 1000;
private boolean isPermissionEnabled; private final boolean isPermissionEnabled;
private boolean persistBlocks; private final boolean persistBlocks;
private UserGroupInformation fsOwner; private final UserGroupInformation fsOwner;
private String supergroup; private final String supergroup;
private boolean standbyShouldCheckpoint; private final boolean standbyShouldCheckpoint;
// Scan interval is not configurable. // Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
private DelegationTokenSecretManager dtSecretManager; private final DelegationTokenSecretManager dtSecretManager;
private boolean alwaysUseDelegationTokensForTests; private final boolean alwaysUseDelegationTokensForTests;
// /** The namespace tree. */
// Stores the correct file name hierarchy
//
FSDirectory dir; FSDirectory dir;
private BlockManager blockManager; private final BlockManager blockManager;
private DatanodeStatistics datanodeStatistics; private final DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode // Block pool ID used by this namenode
private String blockPoolId; private String blockPoolId;
LeaseManager leaseManager = new LeaseManager(this); final LeaseManager leaseManager = new LeaseManager(this);
Daemon smmthread = null; // SafeModeMonitor thread Daemon smmthread = null; // SafeModeMonitor thread
@ -291,23 +289,23 @@ private static final void logAuditEvent(UserGroupInformation ugi,
private volatile boolean hasResourcesAvailable = false; private volatile boolean hasResourcesAvailable = false;
private volatile boolean fsRunning = true; private volatile boolean fsRunning = true;
long systemStart = 0;
//resourceRecheckInterval is how often namenode checks for the disk space availability /** The start time of the namesystem. */
private long resourceRecheckInterval; private final long startTime = now();
/** The interval of namenode checking for the disk space availability */
private final long resourceRecheckInterval;
// The actual resource checker instance. // The actual resource checker instance.
NameNodeResourceChecker nnResourceChecker; NameNodeResourceChecker nnResourceChecker;
private FsServerDefaults serverDefaults; private final FsServerDefaults serverDefaults;
private final boolean supportAppends;
private boolean supportAppends; private final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
private ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information private volatile SafeModeInfo safeMode; // safe mode information
private long maxFsObjects = 0; // maximum number of fs objects private final long maxFsObjects; // maximum number of fs objects
/** /**
* The global generation stamp for this file system. * The global generation stamp for this file system.
@ -315,10 +313,10 @@ private static final void logAuditEvent(UserGroupInformation ugi,
private final GenerationStamp generationStamp = new GenerationStamp(); private final GenerationStamp generationStamp = new GenerationStamp();
// precision of access times. // precision of access times.
private long accessTimePrecision = 0; private final long accessTimePrecision;
// lock to protect FSNamesystem. /** Lock to protect FSNamesystem. */
private ReentrantReadWriteLock fsLock; private ReentrantReadWriteLock fsLock = new ReentrantReadWriteLock(true);
/** /**
* Used when this NN is in standby state to read from the shared edit log. * Used when this NN is in standby state to read from the shared edit log.
@ -336,9 +334,7 @@ private static final void logAuditEvent(UserGroupInformation ugi,
*/ */
private HAContext haContext; private HAContext haContext;
private boolean haEnabled; private final boolean haEnabled;
private final Configuration conf;
/** /**
* Instantiates an FSNamesystem loaded from the image and edits * Instantiates an FSNamesystem loaded from the image and edits
@ -390,9 +386,71 @@ public static FSNamesystem loadFromDisk(Configuration conf)
* @throws IOException on bad configuration * @throws IOException on bad configuration
*/ */
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException { FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
this.conf = conf;
try { try {
initialize(conf, fsImage); resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsOwner = UserGroupInformation.getCurrentUser();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("fsOwner = " + fsOwner);
LOG.info("supergroup = " + supergroup);
LOG.info("isPermissionEnabled = " + isPermissionEnabled);
final boolean persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
DFS_PERSIST_BLOCKS_DEFAULT);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);
this.persistBlocks = persistBlocks || (haEnabled && HAUtil.usesSharedEditsDir(conf));
// Sanity check the HA-related config.
if (nameserviceId != null) {
LOG.info("Determined nameservice ID: " + nameserviceId);
}
LOG.info("HA Enabled: " + haEnabled);
if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
throw new IOException("Invalid configuration: a shared edits dir " +
"must not be specified if HA is not enabled.");
}
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
LOG.info("Append Enabled: " + haEnabled);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
} catch(IOException e) { } catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e); LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close(); close();
@ -400,24 +458,6 @@ public static FSNamesystem loadFromDisk(Configuration conf)
} }
} }
/**
* Initialize FSNamesystem.
*/
private void initialize(Configuration conf, FSImage fsImage)
throws IOException {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.systemStart = now();
this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
}
void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled) void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
throws IOException { throws IOException {
// format before starting up if requested // format before starting up if requested
@ -601,13 +641,13 @@ void stopActiveServices() {
} }
/** Start services required in standby state */ /** Start services required in standby state */
void startStandbyServices() { void startStandbyServices(final Configuration conf) {
LOG.info("Starting services required for standby state"); LOG.info("Starting services required for standby state");
if (!dir.fsImage.editLog.isOpenForRead()) { if (!dir.fsImage.editLog.isOpenForRead()) {
// During startup, we're already open for read. // During startup, we're already open for read.
dir.fsImage.editLog.initSharedJournalsForRead(); dir.fsImage.editLog.initSharedJournalsForRead();
} }
editLogTailer = new EditLogTailer(this); editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start(); editLogTailer.start();
if (standbyShouldCheckpoint) { if (standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this); standbyCheckpointer = new StandbyCheckpointer(conf, this);
@ -769,10 +809,6 @@ public static List<URI> getSharedEditsDirs(Configuration conf) {
return Util.stringCollectionAsURIs(dirNames); return Util.stringCollectionAsURIs(dirNames);
} }
public Configuration getConf() {
return conf;
}
@Override @Override
public void readLock() { public void readLock() {
this.fsLock.readLock().lock(); this.fsLock.readLock().lock();
@ -806,69 +842,6 @@ public boolean hasReadOrWriteLock() {
return hasReadLock() || hasWriteLock(); return hasReadLock() || hasWriteLock();
} }
/**
* Initializes some of the members from configuration
*/
private void setConfigurationParameters(Configuration conf)
throws IOException {
fsOwner = UserGroupInformation.getCurrentUser();
LOG.info("fsOwner=" + fsOwner);
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("supergroup=" + supergroup);
LOG.info("isPermissionEnabled=" + isPermissionEnabled);
this.persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
DFS_PERSIST_BLOCKS_DEFAULT);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);
this.persistBlocks |= haEnabled && HAUtil.usesSharedEditsDir(conf);
// Sanity check the HA-related config.
if (nameserviceId != null) {
LOG.info("Determined nameservice ID: " + nameserviceId);
}
LOG.info("HA Enabled: " + haEnabled);
if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
throw new IOException("Invalid configuration: a shared edits dir " +
"must not be specified if HA is not enabled.");
}
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY,
DFS_SUPPORT_APPEND_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY,
DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests =
conf.getBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
}
NamespaceInfo getNamespaceInfo() { NamespaceInfo getNamespaceInfo() {
readLock(); readLock();
try { try {
@ -2761,7 +2734,7 @@ boolean internalReleaseLease(Lease lease, String src,
} }
private Lease reassignLease(Lease lease, String src, String newHolder, private Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException { INodeFileUnderConstruction pendingFile) {
assert hasWriteLock(); assert hasWriteLock();
if(newHolder == null) if(newHolder == null)
return lease; return lease;
@ -3329,7 +3302,7 @@ boolean restoreFailedStorage(String arg) throws AccessControlException {
} }
Date getStartTime() { Date getStartTime() {
return new Date(systemStart); return new Date(startTime);
} }
void finalizeUpgrade() throws IOException { void finalizeUpgrade() throws IOException {
@ -3506,7 +3479,7 @@ private synchronized void leave(boolean checkForUpgrades) {
if (!isPopulatingReplQueues() && !isInStandbyState()) { if (!isPopulatingReplQueues() && !isInStandbyState()) {
initializeReplQueues(); initializeReplQueues();
} }
long timeInSafemode = now() - systemStart; long timeInSafemode = now() - startTime;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after " NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs."); + timeInSafemode/1000 + " secs.");
NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode); NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
@ -4876,7 +4849,7 @@ void loadSecretManagerState(DataInputStream in) throws IOException {
* *
* @param key new delegation key. * @param key new delegation key.
*/ */
public void logUpdateMasterKey(DelegationKey key) throws IOException { public void logUpdateMasterKey(DelegationKey key) {
assert !isInSafeMode() : assert !isInSafeMode() :
"this should never be called while in safemode, since we stop " + "this should never be called while in safemode, since we stop " +
@ -4889,7 +4862,7 @@ public void logUpdateMasterKey(DelegationKey key) throws IOException {
} }
private void logReassignLease(String leaseHolder, String src, private void logReassignLease(String leaseHolder, String src,
String newHolder) throws IOException { String newHolder) {
writeLock(); writeLock();
try { try {
getEditLog().logReassignLease(leaseHolder, src, newHolder); getEditLog().logReassignLease(leaseHolder, src, newHolder);

View File

@ -1061,7 +1061,7 @@ public void stopActiveServices() throws IOException {
@Override @Override
public void startStandbyServices() throws IOException { public void startStandbyServices() throws IOException {
namesystem.startStandbyServices(); namesystem.startStandbyServices(conf);
} }
@Override @Override

View File

@ -61,6 +61,7 @@ public class EditLogTailer {
private final EditLogTailerThread tailerThread; private final EditLogTailerThread tailerThread;
private final Configuration conf;
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
private FSEditLog editLog; private FSEditLog editLog;
@ -98,13 +99,12 @@ public class EditLogTailer {
*/ */
private long sleepTimeMs; private long sleepTimeMs;
public EditLogTailer(FSNamesystem namesystem) { public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
this.tailerThread = new EditLogTailerThread(); this.tailerThread = new EditLogTailerThread();
this.conf = conf;
this.namesystem = namesystem; this.namesystem = namesystem;
this.editLog = namesystem.getEditLog(); this.editLog = namesystem.getEditLog();
Configuration conf = namesystem.getConf();
lastLoadTimestamp = now(); lastLoadTimestamp = now();
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
@ -129,14 +129,12 @@ public EditLogTailer(FSNamesystem namesystem) {
} }
private InetSocketAddress getActiveNodeAddress() { private InetSocketAddress getActiveNodeAddress() {
Configuration conf = namesystem.getConf();
Configuration activeConf = HAUtil.getConfForOtherNode(conf); Configuration activeConf = HAUtil.getConfForOtherNode(conf);
return NameNode.getServiceAddress(activeConf, true); return NameNode.getServiceAddress(activeConf, true);
} }
private NamenodeProtocol getActiveNodeProxy() throws IOException { private NamenodeProtocol getActiveNodeProxy() throws IOException {
if (cachedActiveProxy == null) { if (cachedActiveProxy == null) {
Configuration conf = namesystem.getConf();
NamenodeProtocolPB proxy = NamenodeProtocolPB proxy =
RPC.waitForProxy(NamenodeProtocolPB.class, RPC.waitForProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf); RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.tools; package org.apache.hadoop.hdfs.tools;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -25,9 +26,12 @@
import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer; import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import com.google.common.base.Preconditions;
/** /**
* One of the NN NameNodes acting as the target of an administrative command * One of the NN NameNodes acting as the target of an administrative command
* (e.g. failover). * (e.g. failover).
@ -35,16 +39,36 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class NNHAServiceTarget extends HAServiceTarget { public class NNHAServiceTarget extends HAServiceTarget {
// Keys added to the fencing script environment
private static final String NAMESERVICE_ID_KEY = "nameserviceid";
private static final String NAMENODE_ID_KEY = "namenodeid";
private final InetSocketAddress addr; private final InetSocketAddress addr;
private NodeFencer fencer; private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError; private BadFencingConfigurationException fenceConfigError;
private final String nnId; private final String nnId;
private final String nsId; private final String nsId;
public NNHAServiceTarget(Configuration localNNConf, public NNHAServiceTarget(Configuration conf,
String nsId, String nnId) { String nsId, String nnId) {
Preconditions.checkNotNull(nnId);
if (nsId == null) {
nsId = DFSUtil.getOnlyNameServiceIdOrNull(conf);
if (nsId == null) {
throw new IllegalArgumentException(
"Unable to determine the nameservice id.");
}
}
assert nsId != null;
// Make a copy of the conf, and override configs based on the
// target node -- not the node we happen to be running on.
HdfsConfiguration targetConf = new HdfsConfiguration(conf);
NameNode.initializeGenericKeys(targetConf, nsId, nnId);
String serviceAddr = String serviceAddr =
DFSUtil.getNamenodeServiceAddr(localNNConf, nsId, nnId); DFSUtil.getNamenodeServiceAddr(targetConf, nsId, nnId);
if (serviceAddr == null) { if (serviceAddr == null) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Unable to determine service address for namenode '" + nnId + "'"); "Unable to determine service address for namenode '" + nnId + "'");
@ -52,7 +76,7 @@ public NNHAServiceTarget(Configuration localNNConf,
this.addr = NetUtils.createSocketAddr(serviceAddr, this.addr = NetUtils.createSocketAddr(serviceAddr,
NameNode.DEFAULT_PORT); NameNode.DEFAULT_PORT);
try { try {
this.fencer = NodeFencer.create(localNNConf); this.fencer = NodeFencer.create(targetConf);
} catch (BadFencingConfigurationException e) { } catch (BadFencingConfigurationException e) {
this.fenceConfigError = e; this.fenceConfigError = e;
} }
@ -96,4 +120,12 @@ public String getNameServiceId() {
public String getNameNodeId() { public String getNameNodeId() {
return this.nnId; return this.nnId;
} }
@Override
protected void addFencingParameters(Map<String, String> ret) {
super.addFencingParameters(ret);
ret.put(NAMESERVICE_ID_KEY, getNameServiceId());
ret.put(NAMENODE_ID_KEY, getNameNodeId());
}
} }

View File

@ -731,7 +731,9 @@ private void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
Preconditions.checkArgument(!dstDir.equals(srcDir)); Preconditions.checkArgument(!dstDir.equals(srcDir));
File dstDirF = new File(dstDir); File dstDirF = new File(dstDir);
if (dstDirF.exists()) { if (dstDirF.exists()) {
Files.deleteRecursively(dstDirF); if (!FileUtil.fullyDelete(dstDirF)) {
throw new IOException("Unable to delete: " + dstDirF);
}
} }
LOG.info("Copying namedir from primary node dir " LOG.info("Copying namedir from primary node dir "
+ srcDir + " to " + dstDir); + srcDir + " to " + dstDir);

View File

@ -180,6 +180,17 @@ public void testGetNameServiceIds() {
assertEquals("nn2", it.next().toString()); assertEquals("nn2", it.next().toString());
} }
@Test
public void testGetOnlyNameServiceIdOrNull() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
assertNull(DFSUtil.getOnlyNameServiceIdOrNull(conf));
conf.set(DFS_FEDERATION_NAMESERVICES, "");
assertNull(DFSUtil.getOnlyNameServiceIdOrNull(conf));
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1");
assertEquals("ns1", DFSUtil.getOnlyNameServiceIdOrNull(conf));
}
/** /**
* Test for {@link DFSUtil#getNNServiceRpcAddresses(Configuration)} * Test for {@link DFSUtil#getNNServiceRpcAddresses(Configuration)}
* {@link DFSUtil#getNameServiceIdFromAddress(Configuration, InetSocketAddress, String...) * {@link DFSUtil#getNameServiceIdFromAddress(Configuration, InetSocketAddress, String...)

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestHdfsAdmin {
private static final Path TEST_PATH = new Path("/test");
private Configuration conf = new Configuration();
private MiniDFSCluster cluster;
@Before
public void setUpCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
}
@After
public void shutDownCluster() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test that we can set and clear quotas via {@link HdfsAdmin}.
*/
@Test
public void testHdfsAdminSetQuota() throws Exception {
HdfsAdmin dfsAdmin = new HdfsAdmin(
FileSystem.getDefaultUri(conf), conf);
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(TEST_PATH));
assertEquals(-1, fs.getContentSummary(TEST_PATH).getQuota());
assertEquals(-1, fs.getContentSummary(TEST_PATH).getSpaceQuota());
dfsAdmin.setSpaceQuota(TEST_PATH, 10);
assertEquals(-1, fs.getContentSummary(TEST_PATH).getQuota());
assertEquals(10, fs.getContentSummary(TEST_PATH).getSpaceQuota());
dfsAdmin.setQuota(TEST_PATH, 10);
assertEquals(10, fs.getContentSummary(TEST_PATH).getQuota());
assertEquals(10, fs.getContentSummary(TEST_PATH).getSpaceQuota());
dfsAdmin.clearSpaceQuota(TEST_PATH);
assertEquals(10, fs.getContentSummary(TEST_PATH).getQuota());
assertEquals(-1, fs.getContentSummary(TEST_PATH).getSpaceQuota());
dfsAdmin.clearQuota(TEST_PATH);
assertEquals(-1, fs.getContentSummary(TEST_PATH).getQuota());
assertEquals(-1, fs.getContentSummary(TEST_PATH).getSpaceQuota());
} finally {
if (fs != null) {
fs.close();
}
}
}
/**
* Make sure that a non-HDFS URI throws a helpful error.
*/
@Test(expected = IllegalArgumentException.class)
public void testHdfsAdminWithBadUri() throws IOException, URISyntaxException {
new HdfsAdmin(new URI("file:///bad-scheme"), conf);
}
}

View File

@ -23,8 +23,8 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -143,8 +143,7 @@ public static int computeInvalidationWork(BlockManager bm) {
* {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
* a high value to ensure that all work is calculated. * a high value to ensure that all work is calculated.
*/ */
public static int computeAllPendingWork(BlockManager bm) public static int computeAllPendingWork(BlockManager bm) {
throws IOException {
int work = computeInvalidationWork(bm); int work = computeInvalidationWork(bm);
work += bm.computeReplicationWork(Integer.MAX_VALUE); work += bm.computeReplicationWork(Integer.MAX_VALUE);
return work; return work;

View File

@ -37,6 +37,7 @@
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -181,7 +182,9 @@ public static FSImageTransactionalStorageInspector inspectStorageDirectory(
public static FSEditLog createStandaloneEditLog(File logDir) public static FSEditLog createStandaloneEditLog(File logDir)
throws IOException { throws IOException {
assertTrue(logDir.mkdirs() || logDir.exists()); assertTrue(logDir.mkdirs() || logDir.exists());
Files.deleteDirectoryContents(logDir); if (!FileUtil.fullyDeleteContents(logDir)) {
throw new IOException("Unable to delete contents of " + logDir);
}
NNStorage storage = Mockito.mock(NNStorage.class); NNStorage storage = Mockito.mock(NNStorage.class);
StorageDirectory sd StorageDirectory sd
= FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS); = FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS);

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
@ -41,6 +42,7 @@
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.io.Files;
/** /**
* Tests for HAAdmin command with {@link MiniDFSCluster} set up in HA mode. * Tests for HAAdmin command with {@link MiniDFSCluster} set up in HA mode.
@ -59,6 +61,8 @@ public class TestDFSHAAdminMiniCluster {
private String errOutput; private String errOutput;
private int nn1Port;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
conf = new Configuration(); conf = new Configuration();
@ -69,6 +73,8 @@ public void setup() throws IOException {
tool.setConf(conf); tool.setConf(conf);
tool.setErrOut(new PrintStream(errOutBytes)); tool.setErrOut(new PrintStream(errOutBytes));
cluster.waitActive(); cluster.waitActive();
nn1Port = cluster.getNameNodePort(0);
} }
@After @After
@ -125,8 +131,16 @@ public void testFencer() throws Exception {
// Test failover with no fencer // Test failover with no fencer
assertEquals(-1, runTool("-failover", "nn1", "nn2")); assertEquals(-1, runTool("-failover", "nn1", "nn2"));
// Set up fencer to write info about the fencing target into a
// tmp file, so we can verify that the args were substituted right
File tmpFile = File.createTempFile("testFencer", ".txt");
tmpFile.deleteOnExit();
conf.set(NodeFencer.CONF_METHODS_KEY,
"shell(echo -n $target_nameserviceid.$target_namenodeid " +
"$target_port $dfs_ha_namenode_id > " +
tmpFile.getAbsolutePath() + ")");
// Test failover with fencer // Test failover with fencer
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");
tool.setConf(conf); tool.setConf(conf);
assertEquals(0, runTool("-transitionToActive", "nn1")); assertEquals(0, runTool("-transitionToActive", "nn1"));
assertEquals(0, runTool("-failover", "nn1", "nn2")); assertEquals(0, runTool("-failover", "nn1", "nn2"));
@ -134,21 +148,36 @@ public void testFencer() throws Exception {
// Test failover with fencer and nameservice // Test failover with fencer and nameservice
assertEquals(0, runTool("-ns", "minidfs-ns", "-failover", "nn2", "nn1")); assertEquals(0, runTool("-ns", "minidfs-ns", "-failover", "nn2", "nn1"));
// Fencer has not run yet, since none of the above required fencing
assertEquals("", Files.toString(tmpFile, Charsets.UTF_8));
// Test failover with fencer and forcefence option // Test failover with fencer and forcefence option
assertEquals(0, runTool("-failover", "nn1", "nn2", "--forcefence")); assertEquals(0, runTool("-failover", "nn1", "nn2", "--forcefence"));
// The fence script should run with the configuration from the target
// node, rather than the configuration from the fencing node
assertEquals("minidfs-ns.nn1 " + nn1Port + " nn1",
Files.toString(tmpFile, Charsets.UTF_8));
tmpFile.delete();
// Test failover with forceactive option // Test failover with forceactive option
assertEquals(0, runTool("-failover", "nn2", "nn1", "--forceactive")); assertEquals(0, runTool("-failover", "nn2", "nn1", "--forceactive"));
// Fencing should not occur, since it was graceful
assertFalse(tmpFile.exists());
// Test failover with not fencer and forcefence option // Test failover with not fencer and forcefence option
conf.unset(NodeFencer.CONF_METHODS_KEY); conf.unset(NodeFencer.CONF_METHODS_KEY);
tool.setConf(conf); tool.setConf(conf);
assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence")); assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence"));
assertFalse(tmpFile.exists());
// Test failover with bad fencer and forcefence option // Test failover with bad fencer and forcefence option
conf.set(NodeFencer.CONF_METHODS_KEY, "foobar!"); conf.set(NodeFencer.CONF_METHODS_KEY, "foobar!");
tool.setConf(conf); tool.setConf(conf);
assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence")); assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence"));
assertFalse(tmpFile.exists());
// Test failover with force fence listed before the other arguments // Test failover with force fence listed before the other arguments
conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)"); conf.set(NodeFencer.CONF_METHODS_KEY, "shell(true)");

View File

@ -139,6 +139,8 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-3955. Change MR to use ProtobufRpcEngine from hadoop-common MAPREDUCE-3955. Change MR to use ProtobufRpcEngine from hadoop-common
instead of ProtoOverHadoopRpcEngine. (Jitendra Nath Pandey via sseth) instead of ProtoOverHadoopRpcEngine. (Jitendra Nath Pandey via sseth)
MAPREDUCE-4103. Fix HA docs for changes to shell command fencer args (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -198,6 +200,8 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-4095. TestJobInProgress#testLocality uses a bogus topology. MAPREDUCE-4095. TestJobInProgress#testLocality uses a bogus topology.
(Colin Patrick McCabe via eli) (Colin Patrick McCabe via eli)
MAPREDUCE-4098. TestMRApps testSetClasspath fails (tucu)
Release 0.23.3 - UNRELEASED Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -240,6 +244,16 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4020. Web services returns incorrect JSON for deep queue tree MAPREDUCE-4020. Web services returns incorrect JSON for deep queue tree
(Anupam Seth via tgraves) (Anupam Seth via tgraves)
MAPREDUCE-3672. Killed maps shouldn't be counted towards
JobCounter.NUM_FAILED_MAPS. (Anupam Seth via tgraves)
MAPREDUCE-3682 Tracker URL says AM tasks run on localhost.
(Ravi Prakash via tgraves)
MAPREDUCE-3082. Archive command take wrong path for input file with current
directory (John George via bobby)
MAPREDUCE-3650. testGetTokensForHftpFS() fails (Ravi Prakash via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -643,7 +643,8 @@ private final class ContainerAllocatorRouter extends AbstractService
public synchronized void start() { public synchronized void start() {
if (job.isUber()) { if (job.isUber()) {
this.containerAllocator = new LocalContainerAllocator( this.containerAllocator = new LocalContainerAllocator(
this.clientService, this.context); this.clientService, this.context, nmHost, nmPort, nmHttpPort
, containerID);
} else { } else {
this.containerAllocator = new RMContainerAllocator( this.containerAllocator = new RMContainerAllocator(
this.clientService, this.context); this.clientService, this.context);

View File

@ -989,6 +989,23 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
return jce; return jce;
} }
private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
long slotMillisIncrement = computeSlotMillis(taskAttempt);
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
} else {
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
}
return jce;
}
private static private static
TaskAttemptUnsuccessfulCompletionEvent TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
@ -1214,8 +1231,13 @@ public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState); finalState);
taskAttempt.eventHandler if(finalState == TaskAttemptState.FAILED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt)); .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
} else if(finalState == TaskAttemptState.KILLED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
}
taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce)); taskAttempt.attemptId.getTaskId().getJobId(), tauce));
} else { } else {
@ -1441,7 +1463,7 @@ public void transition(TaskAttemptImpl taskAttempt,
taskAttempt.setFinishTime(); taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) { if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt)); .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptState.KILLED); TaskAttemptState.KILLED);

View File

@ -65,14 +65,23 @@ public class LocalContainerAllocator extends RMCommunicator
private AtomicInteger containerCount = new AtomicInteger(); private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval; private long retryInterval;
private long retrystartTime; private long retrystartTime;
private String nmHost;
private int nmPort;
private int nmHttpPort;
private ContainerId containerId;
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
public LocalContainerAllocator(ClientService clientService, public LocalContainerAllocator(ClientService clientService,
AppContext context) { AppContext context, String nmHost, int nmPort, int nmHttpPort
, ContainerId cId) {
super(clientService, context); super(clientService, context);
this.eventHandler = context.getEventHandler(); this.eventHandler = context.getEventHandler();
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.containerId = cId;
} }
@Override @Override
@ -131,17 +140,17 @@ public void handle(ContainerAllocatorEvent event) {
LOG.info("Processing the event " + event.toString()); LOG.info("Processing the event " + event.toString());
ContainerId cID = recordFactory.newRecordInstance(ContainerId.class); ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
cID.setApplicationAttemptId(applicationAttemptId); cID.setApplicationAttemptId(applicationAttemptId);
// use negative ids to denote that these are local. Need a better way ?? // Assign the same container ID as the AM
cID.setId((-1) * containerCount.getAndIncrement()); cID.setId(this.containerId.getId());
Container container = recordFactory.newRecordInstance(Container.class); Container container = recordFactory.newRecordInstance(Container.class);
container.setId(cID); container.setId(cID);
NodeId nodeId = Records.newRecord(NodeId.class); NodeId nodeId = Records.newRecord(NodeId.class);
nodeId.setHost("localhost"); nodeId.setHost(this.nmHost);
nodeId.setPort(1234); nodeId.setPort(this.nmPort);
container.setNodeId(nodeId); container.setNodeId(nodeId);
container.setContainerToken(null); container.setContainerToken(null);
container.setNodeHttpAddress("localhost:8042"); container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);
// send the container-assigned event to task attempt // send the container-assigned event to task attempt
if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) { if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test; import org.junit.Test;
@ -130,15 +131,12 @@ public class TestMRApps {
Job job = Job.getInstance(); Job job = Job.getInstance();
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration()); MRApps.setClasspath(environment, job.getConfiguration());
assertEquals("$PWD:$HADOOP_CONF_DIR:" + assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
"$HADOOP_COMMON_HOME/share/hadoop/common/*:" + String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
"$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" + if (confClasspath != null) {
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" + confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:" + }
"$YARN_HOME/share/hadoop/mapreduce/*:" + assertTrue(environment.get("CLASSPATH").contains(confClasspath));
"$YARN_HOME/share/hadoop/mapreduce/lib/*:" +
"job.jar:$PWD/*",
environment.get("CLASSPATH"));
} }
@Test public void testSetClasspathWithUserPrecendence() { @Test public void testSetClasspathWithUserPrecendence() {

View File

@ -27,6 +27,8 @@
public enum JobCounter { public enum JobCounter {
NUM_FAILED_MAPS, NUM_FAILED_MAPS,
NUM_FAILED_REDUCES, NUM_FAILED_REDUCES,
NUM_KILLED_MAPS,
NUM_KILLED_REDUCES,
TOTAL_LAUNCHED_MAPS, TOTAL_LAUNCHED_MAPS,
TOTAL_LAUNCHED_REDUCES, TOTAL_LAUNCHED_REDUCES,
OTHER_LOCAL_MAPS, OTHER_LOCAL_MAPS,

View File

@ -16,6 +16,8 @@ CounterGroupName= Job Counters
NUM_FAILED_MAPS.name= Failed map tasks NUM_FAILED_MAPS.name= Failed map tasks
NUM_FAILED_REDUCES.name= Failed reduce tasks NUM_FAILED_REDUCES.name= Failed reduce tasks
NUM_KILLED_MAPS.name= Killed map tasks
NUM_KILLED_REDUCES.name= Killed reduce tasks
TOTAL_LAUNCHED_MAPS.name= Launched map tasks TOTAL_LAUNCHED_MAPS.name= Launched map tasks
TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks
OTHER_LOCAL_MAPS.name= Other local map tasks OTHER_LOCAL_MAPS.name= Other local map tasks

View File

@ -195,6 +195,8 @@ private void testFailedJob(String fileName,
RunningJob job = jobClient.submitJob(jc); RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID(); JobID id = job.getID();
job.waitForCompletion(); job.waitForCompletion();
Counters counters = job.getCounters();
assertTrue("No. of failed maps should be 1",counters.getCounter(JobCounter.NUM_FAILED_MAPS) == 1);
if (fileName != null) { if (fileName != null) {
Path testFile = new Path(outDir, fileName); Path testFile = new Path(outDir, fileName);
@ -241,6 +243,9 @@ private void testKilledJob(String fileName,
job.waitForCompletion(); // wait for the job to complete job.waitForCompletion(); // wait for the job to complete
counters = job.getCounters();
assertTrue("No. of killed maps should be 1", counters.getCounter(JobCounter.NUM_KILLED_MAPS) == 1);
if (fileName != null) { if (fileName != null) {
Path testFile = new Path(outDir, fileName); Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for job " + id, assertTrue("File " + testFile + " missing for job " + id,

View File

@ -234,8 +234,10 @@ public void testSpeculativeExecution() throws Exception {
.getValue()); .getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
.getValue()); .getValue());
Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_FAILED_MAPS) Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
.getValue()); .getValue());
Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_KILLED_MAPS)
.getValue());
/*---------------------------------------------------------------------- /*----------------------------------------------------------------------
* Test that Reducer speculates if REDUCE_SPECULATIVE is true and * Test that Reducer speculates if REDUCE_SPECULATIVE is true and

View File

@ -322,13 +322,40 @@ HDFS High Availability
The string between '(' and ')' is passed directly to a bash shell and may not The string between '(' and ')' is passed directly to a bash shell and may not
include any closing parentheses. include any closing parentheses.
When executed, the first argument to the configured script will be the address
of the NameNode to be fenced, followed by all arguments specified in the
configuration.
The shell command will be run with an environment set up to contain all of the The shell command will be run with an environment set up to contain all of the
current Hadoop configuration variables, with the '_' character replacing any current Hadoop configuration variables, with the '_' character replacing any
'.' characters in the configuration keys. If the shell command returns an exit '.' characters in the configuration keys. The configuration used has already had
any namenode-specific configurations promoted to their generic forms -- for example
<<dfs_namenode_rpc-address>> will contain the RPC address of the target node, even
though the configuration may specify that variable as
<<dfs.namenode.rpc-address.ns1.nn1>>.
Additionally, the following variables referring to the target node to be fenced
are also available:
*-----------------------:-----------------------------------+
| $target_host | hostname of the node to be fenced |
*-----------------------:-----------------------------------+
| $target_port | IPC port of the node to be fenced |
*-----------------------:-----------------------------------+
| $target_address | the above two, combined as host:port |
*-----------------------:-----------------------------------+
| $target_nameserviceid | the nameservice ID of the NN to be fenced |
*-----------------------:-----------------------------------+
| $target_namenodeid | the namenode ID of the NN to be fenced |
*-----------------------:-----------------------------------+
These environment variables may also be used as substitutions in the shell
command itself. For example:
---
<property>
<name>dfs.ha.fencing.methods</name>
<value>shell(/path/to/my/script.sh --nameservice=$target_nameserviceid $target_host:$target_port)</value>
</property>
---
If the shell command returns an exit
code of 0, the fencing is determined to be successful. If it returns any other code of 0, the fencing is determined to be successful. If it returns any other
exit code, the fencing was not successful and the next fencing method in the exit code, the fencing was not successful and the next fencing method in the
list will be attempted. list will be attempted.

View File

@ -321,7 +321,7 @@ public void testGetTokensForHftpFS() throws IOException, URISyntaxException {
final Token<DelegationTokenIdentifier> t = final Token<DelegationTokenIdentifier> t =
new Token<DelegationTokenIdentifier>(dtId, dtSecretManager); new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
final URI uri = new URI("hftp://host:2222/file1"); final URI uri = new URI("hftp://127.0.0.1:2222/file1");
final String fs_addr = final String fs_addr =
SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT); SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
t.setService(new Text(fs_addr)); t.setService(new Text(fs_addr));

View File

@ -253,7 +253,7 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>r09</version> <version>11.0.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-cli</groupId> <groupId>commons-cli</groupId>

View File

@ -830,11 +830,18 @@ public int run(String[] args) throws Exception {
throw new IOException("Parent path not specified."); throw new IOException("Parent path not specified.");
} }
parentPath = new Path(args[i+1]); parentPath = new Path(args[i+1]);
if (!parentPath.isAbsolute()) {
parentPath= parentPath.getFileSystem(getConf()).makeQualified(parentPath);
}
i+=2; i+=2;
//read the rest of the paths //read the rest of the paths
for (; i < args.length; i++) { for (; i < args.length; i++) {
if (i == (args.length - 1)) { if (i == (args.length - 1)) {
destPath = new Path(args[i]); destPath = new Path(args[i]);
if (!destPath.isAbsolute()) {
destPath = destPath.getFileSystem(getConf()).makeQualified(destPath);
}
} }
else { else {
Path argPath = new Path(args[i]); Path argPath = new Path(args[i]);

View File

@ -104,6 +104,41 @@ protected void tearDown() throws Exception {
} }
public void testRelativePath() throws Exception {
fs.delete(archivePath, true);
final Path sub1 = new Path(inputPath, "dir1");
fs.mkdirs(sub1);
createFile(sub1, "a", fs);
final Configuration conf = mapred.createJobConf();
final FsShell shell = new FsShell(conf);
final List<String> originalPaths = lsr(shell, "input");
System.out.println("originalPath: " + originalPaths);
final URI uri = fs.getUri();
final String prefix = "har://hdfs-" + uri.getHost() +":" + uri.getPort()
+ archivePath.toUri().getPath() + Path.SEPARATOR;
{
final String harName = "foo.har";
final String[] args = {
"-archiveName",
harName,
"-p",
"input",
"*",
"archive"
};
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH, HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(mapred.createJobConf());
assertEquals(0, ToolRunner.run(har, args));
//compare results
final List<String> harPaths = lsr(shell, prefix + harName);
assertEquals(originalPaths, harPaths);
}
}
public void testPathWithSpaces() throws Exception { public void testPathWithSpaces() throws Exception {
fs.delete(archivePath, true); fs.delete(archivePath, true);
@ -170,8 +205,11 @@ private static List<String> lsr(final FsShell shell, String dir
System.setErr(oldErr); System.setErr(oldErr);
} }
System.out.println("lsr results:\n" + results); System.out.println("lsr results:\n" + results);
String dirname = dir;
if (dir.lastIndexOf(Path.SEPARATOR) != -1 ) {
dirname = dir.substring(dir.lastIndexOf(Path.SEPARATOR));
}
final String dirname = dir.substring(dir.lastIndexOf(Path.SEPARATOR));
final List<String> paths = new ArrayList<String>(); final List<String> paths = new ArrayList<String>();
for(StringTokenizer t = new StringTokenizer(results, "\n"); for(StringTokenizer t = new StringTokenizer(results, "\n");
t.hasMoreTokens(); ) { t.hasMoreTokens(); ) {