HDFS-5846. Shuffle phase is slow in Windows - FadviseFileRegion::transferTo does not read disks efficiently. Contributed by Nikola Vujic.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581091 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-03-24 22:16:48 +00:00
parent 6376cd38a1
commit 328fc86bdb
6 changed files with 188 additions and 18 deletions

View File

@ -680,6 +680,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6135. In HDFS upgrade with HA setup, JournalNode cannot handle layout
version bump when rolling back. (jing9)
HDFS-5846. Assigning DEFAULT_RACK in resolveNetworkLocation method can break
data resiliency. (Nikola Vujic via cnauroth)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -611,6 +611,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
// Handling unresolved DN topology mapping
public static final String DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY =
"dfs.namenode.reject-unresolved-dn-topology-mapping";
public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
false;
// hedged read properties
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
"dfs.client.hedged.read.threshold.millis";

View File

@ -98,6 +98,7 @@ public class DatanodeManager {
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
private final DNSToSwitchMapping dnsToSwitchMapping;
private final boolean rejectUnresolvedTopologyDN;
private final int defaultXferPort;
@ -201,6 +202,10 @@ public class DatanodeManager {
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
this.rejectUnresolvedTopologyDN = conf.getBoolean(
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY,
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT);
// If the dns to switch mapping supports cache, resolve network
// locations of those hosts in the include list and store the mapping
// in the cache; so future calls to resolve will be fast.
@ -391,7 +396,8 @@ DatanodeDescriptor getDatanodeDescriptor(String address) {
node = getDatanodeByHost(host);
}
if (node == null) {
String networkLocation = resolveNetworkLocation(dnId);
String networkLocation =
resolveNetworkLocationWithFallBackToDefaultLocation(dnId);
// If the current cluster doesn't contain the node, fallback to
// something machine local and then rack local.
@ -627,8 +633,35 @@ public HashMap<String, Integer> getDatanodesSoftwareVersions() {
}
}
/* Resolve a node's network location */
private String resolveNetworkLocation (DatanodeID node) {
/**
* Resolve a node's network location. If the DNS to switch mapping fails
* then this method guarantees default rack location.
* @param node to resolve to network location
* @return network location path
*/
private String resolveNetworkLocationWithFallBackToDefaultLocation (
DatanodeID node) {
String networkLocation;
try {
networkLocation = resolveNetworkLocation(node);
} catch (UnresolvedTopologyException e) {
LOG.error("Unresolved topology mapping. Using " +
NetworkTopology.DEFAULT_RACK + " for host " + node.getHostName());
networkLocation = NetworkTopology.DEFAULT_RACK;
}
return networkLocation;
}
/**
* Resolve a node's network location. If the DNS to switch mapping fails,
* then this method throws UnresolvedTopologyException.
* @param node to resolve to network location
* @return network location path.
* @throws UnresolvedTopologyException if the DNS to switch mapping fails
* to resolve network location.
*/
private String resolveNetworkLocation (DatanodeID node)
throws UnresolvedTopologyException {
List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
names.add(node.getIpAddr());
@ -640,9 +673,9 @@ private String resolveNetworkLocation (DatanodeID node) {
List<String> rName = dnsToSwitchMapping.resolve(names);
String networkLocation;
if (rName == null) {
LOG.error("The resolve call returned null! Using " +
NetworkTopology.DEFAULT_RACK + " for host " + names);
networkLocation = NetworkTopology.DEFAULT_RACK;
LOG.error("The resolve call returned null!");
throw new UnresolvedTopologyException(
"Unresolved topology mapping for host " + node.getHostName());
} else {
networkLocation = rName.get(0);
}
@ -755,9 +788,11 @@ void stopDecommission(DatanodeDescriptor node) {
* @param nodeReg the datanode registration
* @throws DisallowedDatanodeException if the registration request is
* denied because the datanode does not match includes/excludes
* @throws UnresolvedTopologyException if the registration request is
* denied because resolving datanode network location fails.
*/
public void registerDatanode(DatanodeRegistration nodeReg)
throws DisallowedDatanodeException {
throws DisallowedDatanodeException, UnresolvedTopologyException {
InetAddress dnAddress = Server.getRemoteIp();
if (dnAddress != null) {
// Mostly called inside an RPC, update ip and peer hostname
@ -839,7 +874,13 @@ nodes with its data cleared (or user can just remove the StorageID
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
if(this.rejectUnresolvedTopologyDN)
{
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
} else {
nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
}
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
@ -861,7 +902,13 @@ nodes with its data cleared (or user can just remove the StorageID
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false;
try {
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
} else {
nodeDescr.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
}
networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
/**
* This exception is thrown if resolving topology path
* for a node fails.
*/
public class UnresolvedTopologyException extends IOException {
/** for java.io.Serializable */
private static final long serialVersionUID = 1L;
public UnresolvedTopologyException(String text) {
super(text);
}
}

View File

@ -1841,4 +1841,19 @@
</description>
</property>
<property>
<name>dfs.namenode.reject-unresolved-dn-topology-mapping</name>
<value>false</value>
<description>
If the value is set to true, then namenode will reject datanode
registration if the topology mapping for a datanode is not resolved and
NULL is returned (script defined by net.topology.script.file.name fails
to execute). Otherwise, datanode will be registered and the default rack
will be assigned as the topology path. Topology paths are important for
data resiliency, since they define fault domains. Thus it may be unwanted
behavior to allow datanode registration with the default rack if the
resolving topology failed.
</description>
</property>
</configuration>

View File

@ -21,21 +21,29 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mortbay.log.Log;
import static org.junit.Assert.*;
public class TestDatanodeManager {
public static final Log LOG = LogFactory.getLog(TestDatanodeManager.class);
//The number of times the registration / removal of nodes should happen
final int NUM_ITERATIONS = 500;
@ -57,7 +65,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
Random rng = new Random();
int seed = rng.nextInt();
rng = new Random(seed);
Log.info("Using seed " + seed + " for testing");
LOG.info("Using seed " + seed + " for testing");
//A map of the Storage IDs to the DN registration it was registered with
HashMap <String, DatanodeRegistration> sIdToDnReg =
@ -76,7 +84,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
it.next();
}
DatanodeRegistration toRemove = it.next().getValue();
Log.info("Removing node " + toRemove.getDatanodeUuid() + " ip " +
LOG.info("Removing node " + toRemove.getDatanodeUuid() + " ip " +
toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());
//Remove that random node
@ -110,7 +118,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
Mockito.when(dr.getSoftwareVersion()).thenReturn(
"version" + rng.nextInt(5));
Log.info("Registering node storageID: " + dr.getDatanodeUuid() +
LOG.info("Registering node storageID: " + dr.getDatanodeUuid() +
", version: " + dr.getSoftwareVersion() + ", IP address: "
+ dr.getXferAddr());
@ -136,7 +144,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
}
}
for(Entry <String, Integer> entry: mapToCheck.entrySet()) {
Log.info("Still in map: " + entry.getKey() + " has "
LOG.info("Still in map: " + entry.getKey() + " has "
+ entry.getValue());
}
assertEquals("The map of version counts returned by DatanodeManager was"
@ -145,4 +153,61 @@ public void testNumVersionsReportedCorrect() throws IOException {
}
}
@Test (timeout = 100000)
public void testRejectUnresolvedDatanodes() throws IOException {
//Create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
Configuration conf = new Configuration();
//Set configuration property for rejecting unresolved topology mapping
conf.setBoolean(
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY, true);
//set TestDatanodeManager.MyResolver to be used for topology resolving
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);
//create DatanodeManager
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, conf);
//storageID to register.
String storageID = "someStorageID-123";
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID);
try {
//Register this node
dm.registerDatanode(dr);
Assert.fail("Expected an UnresolvedTopologyException");
} catch (UnresolvedTopologyException ute) {
LOG.info("Expected - topology is not resolved and " +
"registration is rejected.");
} catch (Exception e) {
Assert.fail("Expected an UnresolvedTopologyException");
}
}
/**
* MyResolver class provides resolve method which always returns null
* in order to simulate unresolved topology mapping.
*/
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return null;
}
@Override
public void reloadCachedMappings() {
}
@Override
public void reloadCachedMappings(List<String> names) {
}
}
}