From 328fc86bdbf84fcc80a0920b2cacfc2e74ac5c9f Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Mon, 24 Mar 2014 22:16:48 +0000 Subject: [PATCH] HDFS-5846. Shuffle phase is slow in Windows - FadviseFileRegion::transferTo does not read disks efficiently. Contributed by Nikola Vujic. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581091 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +- .../blockmanagement/DatanodeManager.java | 67 +++++++++++++--- .../UnresolvedTopologyException.java | 34 ++++++++ .../src/main/resources/hdfs-default.xml | 17 +++- .../blockmanagement/TestDatanodeManager.java | 77 +++++++++++++++++-- 6 files changed, 188 insertions(+), 18 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnresolvedTopologyException.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f08d51ae61..f4308f9b95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -680,6 +680,9 @@ Release 2.4.0 - UNRELEASED HDFS-6135. In HDFS upgrade with HA setup, JournalNode cannot handle layout version bump when rolling back. (jing9) + HDFS-5846. Assigning DEFAULT_RACK in resolveNetworkLocation method can break + data resiliency. (Nikola Vujic via cnauroth) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5a14f984ff..fc7af0d10a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -610,7 +610,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500; public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis"; public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000; - + + // Handling unresolved DN topology mapping + public static final String DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY = + "dfs.namenode.reject-unresolved-dn-topology-mapping"; + public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT = + false; + // hedged read properties public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = "dfs.client.hedged.read.threshold.millis"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index e1f424f277..6af4ace514 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -98,6 +98,7 @@ public class DatanodeManager { private final Host2NodesMap host2DatanodeMap = new Host2NodesMap(); private final DNSToSwitchMapping dnsToSwitchMapping; + private final boolean rejectUnresolvedTopologyDN; private final int defaultXferPort; @@ -201,6 +202,10 @@ public class DatanodeManager { conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); + this.rejectUnresolvedTopologyDN = conf.getBoolean( + DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY, + DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT); + // If the dns to switch mapping supports cache, resolve network // locations of those hosts in the include list and store the mapping // in the cache; so future calls to resolve will be fast. @@ -391,7 +396,8 @@ DatanodeDescriptor getDatanodeDescriptor(String address) { node = getDatanodeByHost(host); } if (node == null) { - String networkLocation = resolveNetworkLocation(dnId); + String networkLocation = + resolveNetworkLocationWithFallBackToDefaultLocation(dnId); // If the current cluster doesn't contain the node, fallback to // something machine local and then rack local. @@ -626,9 +632,36 @@ public HashMap getDatanodesSoftwareVersions() { return new HashMap (this.datanodesSoftwareVersions); } } - - /* Resolve a node's network location */ - private String resolveNetworkLocation (DatanodeID node) { + + /** + * Resolve a node's network location. If the DNS to switch mapping fails + * then this method guarantees default rack location. + * @param node to resolve to network location + * @return network location path + */ + private String resolveNetworkLocationWithFallBackToDefaultLocation ( + DatanodeID node) { + String networkLocation; + try { + networkLocation = resolveNetworkLocation(node); + } catch (UnresolvedTopologyException e) { + LOG.error("Unresolved topology mapping. Using " + + NetworkTopology.DEFAULT_RACK + " for host " + node.getHostName()); + networkLocation = NetworkTopology.DEFAULT_RACK; + } + return networkLocation; + } + + /** + * Resolve a node's network location. If the DNS to switch mapping fails, + * then this method throws UnresolvedTopologyException. + * @param node to resolve to network location + * @return network location path. + * @throws UnresolvedTopologyException if the DNS to switch mapping fails + * to resolve network location. + */ + private String resolveNetworkLocation (DatanodeID node) + throws UnresolvedTopologyException { List names = new ArrayList(1); if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { names.add(node.getIpAddr()); @@ -640,9 +673,9 @@ private String resolveNetworkLocation (DatanodeID node) { List rName = dnsToSwitchMapping.resolve(names); String networkLocation; if (rName == null) { - LOG.error("The resolve call returned null! Using " + - NetworkTopology.DEFAULT_RACK + " for host " + names); - networkLocation = NetworkTopology.DEFAULT_RACK; + LOG.error("The resolve call returned null!"); + throw new UnresolvedTopologyException( + "Unresolved topology mapping for host " + node.getHostName()); } else { networkLocation = rName.get(0); } @@ -755,9 +788,11 @@ void stopDecommission(DatanodeDescriptor node) { * @param nodeReg the datanode registration * @throws DisallowedDatanodeException if the registration request is * denied because the datanode does not match includes/excludes + * @throws UnresolvedTopologyException if the registration request is + * denied because resolving datanode network location fails. */ public void registerDatanode(DatanodeRegistration nodeReg) - throws DisallowedDatanodeException { + throws DisallowedDatanodeException, UnresolvedTopologyException { InetAddress dnAddress = Server.getRemoteIp(); if (dnAddress != null) { // Mostly called inside an RPC, update ip and peer hostname @@ -839,7 +874,13 @@ nodes with its data cleared (or user can just remove the StorageID nodeS.setDisallowed(false); // Node is in the include list // resolve network location - nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + if(this.rejectUnresolvedTopologyDN) + { + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + } else { + nodeS.setNetworkLocation( + resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); + } getNetworkTopology().add(nodeS); // also treat the registration message as a heartbeat @@ -861,7 +902,13 @@ nodes with its data cleared (or user can just remove the StorageID = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); boolean success = false; try { - nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); + // resolve network location + if(this.rejectUnresolvedTopologyDN) { + nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); + } else { + nodeDescr.setNetworkLocation( + resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr)); + } networktopology.add(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnresolvedTopologyException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnresolvedTopologyException.java new file mode 100644 index 0000000000..0747131d38 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnresolvedTopologyException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; + +/** + * This exception is thrown if resolving topology path + * for a node fails. + */ +public class UnresolvedTopologyException extends IOException { + /** for java.io.Serializable */ + private static final long serialVersionUID = 1L; + + public UnresolvedTopologyException(String text) { + super(text); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index fee8a58bb0..7074dfa4f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1841,4 +1841,19 @@ - + + dfs.namenode.reject-unresolved-dn-topology-mapping + false + + If the value is set to true, then namenode will reject datanode + registration if the topology mapping for a datanode is not resolved and + NULL is returned (script defined by net.topology.script.file.name fails + to execute). Otherwise, datanode will be registered and the default rack + will be assigned as the topology path. Topology paths are important for + data resiliency, since they define fault domains. Thus it may be unwanted + behavior to allow datanode registration with the default rack if the + resolving topology failed. + + + + \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 84d0d6b580..2c65fff8b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -21,21 +21,29 @@ import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import org.mortbay.log.Log; import static org.junit.Assert.*; public class TestDatanodeManager { - + + public static final Log LOG = LogFactory.getLog(TestDatanodeManager.class); + //The number of times the registration / removal of nodes should happen final int NUM_ITERATIONS = 500; @@ -57,7 +65,7 @@ public void testNumVersionsReportedCorrect() throws IOException { Random rng = new Random(); int seed = rng.nextInt(); rng = new Random(seed); - Log.info("Using seed " + seed + " for testing"); + LOG.info("Using seed " + seed + " for testing"); //A map of the Storage IDs to the DN registration it was registered with HashMap sIdToDnReg = @@ -76,7 +84,7 @@ public void testNumVersionsReportedCorrect() throws IOException { it.next(); } DatanodeRegistration toRemove = it.next().getValue(); - Log.info("Removing node " + toRemove.getDatanodeUuid() + " ip " + + LOG.info("Removing node " + toRemove.getDatanodeUuid() + " ip " + toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion()); //Remove that random node @@ -110,7 +118,7 @@ public void testNumVersionsReportedCorrect() throws IOException { Mockito.when(dr.getSoftwareVersion()).thenReturn( "version" + rng.nextInt(5)); - Log.info("Registering node storageID: " + dr.getDatanodeUuid() + + LOG.info("Registering node storageID: " + dr.getDatanodeUuid() + ", version: " + dr.getSoftwareVersion() + ", IP address: " + dr.getXferAddr()); @@ -136,7 +144,7 @@ public void testNumVersionsReportedCorrect() throws IOException { } } for(Entry entry: mapToCheck.entrySet()) { - Log.info("Still in map: " + entry.getKey() + " has " + LOG.info("Still in map: " + entry.getKey() + " has " + entry.getValue()); } assertEquals("The map of version counts returned by DatanodeManager was" @@ -144,5 +152,62 @@ public void testNumVersionsReportedCorrect() throws IOException { mapToCheck.size()); } } + + @Test (timeout = 100000) + public void testRejectUnresolvedDatanodes() throws IOException { + //Create the DatanodeManager which will be tested + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + + Configuration conf = new Configuration(); + + //Set configuration property for rejecting unresolved topology mapping + conf.setBoolean( + DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY, true); + + //set TestDatanodeManager.MyResolver to be used for topology resolving + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class); + + //create DatanodeManager + DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), + fsn, conf); + + //storageID to register. + String storageID = "someStorageID-123"; + + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID); + + try { + //Register this node + dm.registerDatanode(dr); + Assert.fail("Expected an UnresolvedTopologyException"); + } catch (UnresolvedTopologyException ute) { + LOG.info("Expected - topology is not resolved and " + + "registration is rejected."); + } catch (Exception e) { + Assert.fail("Expected an UnresolvedTopologyException"); + } + } + + /** + * MyResolver class provides resolve method which always returns null + * in order to simulate unresolved topology mapping. + */ + public static class MyResolver implements DNSToSwitchMapping { + @Override + public List resolve(List names) { + return null; + } + @Override + public void reloadCachedMappings() { + } + + @Override + public void reloadCachedMappings(List names) { + } + } }