Remove extra code that code erroneously committed in HDFS-3934 (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489083 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c1b635ed48
commit
4ee1ec2126
@ -251,9 +251,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
|
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
|
||||||
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
|
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
|
||||||
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
|
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
|
||||||
public static final String DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS =
|
|
||||||
"dfs.namenode.hosts.dns.resolution.interval.seconds";
|
|
||||||
public static final int DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS_DEFAULT = 300;
|
|
||||||
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
|
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
|
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
|
||||||
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
||||||
|
@ -130,7 +130,7 @@ public class DatanodeManager {
|
|||||||
private final int defaultIpcPort;
|
private final int defaultIpcPort;
|
||||||
|
|
||||||
/** Read include/exclude files*/
|
/** Read include/exclude files*/
|
||||||
private final HostFileManager hostFileManager;
|
private final HostFileManager hostFileManager = new HostFileManager();
|
||||||
|
|
||||||
/** The period to wait for datanode heartbeat.*/
|
/** The period to wait for datanode heartbeat.*/
|
||||||
private final long heartbeatExpireInterval;
|
private final long heartbeatExpireInterval;
|
||||||
@ -170,11 +170,6 @@ public class DatanodeManager {
|
|||||||
final Configuration conf) throws IOException {
|
final Configuration conf) throws IOException {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
this.blockManager = blockManager;
|
this.blockManager = blockManager;
|
||||||
|
|
||||||
int dnsResolutionSeconds = conf.getInt(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS,
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS_DEFAULT);
|
|
||||||
this.hostFileManager = new HostFileManager(dnsResolutionSeconds);
|
|
||||||
|
|
||||||
networktopology = NetworkTopology.getInstance(conf);
|
networktopology = NetworkTopology.getInstance(conf);
|
||||||
|
|
||||||
@ -296,7 +291,6 @@ public class DatanodeManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void close() {
|
void close() {
|
||||||
IOUtils.cleanup(LOG, hostFileManager);
|
|
||||||
if (decommissionthread != null) {
|
if (decommissionthread != null) {
|
||||||
decommissionthread.interrupt();
|
decommissionthread.interrupt();
|
||||||
try {
|
try {
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
@ -26,18 +25,12 @@ import java.util.HashSet;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.util.HostsFileReader;
|
import org.apache.hadoop.util.HostsFileReader;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class manages the include and exclude files for HDFS.
|
* This class manages the include and exclude files for HDFS.
|
||||||
*
|
*
|
||||||
@ -74,12 +67,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||||||
* The "registration name" feature is a little odd and it may be removed in the
|
* The "registration name" feature is a little odd and it may be removed in the
|
||||||
* future (I hope?)
|
* future (I hope?)
|
||||||
*/
|
*/
|
||||||
public class HostFileManager implements Closeable {
|
public class HostFileManager {
|
||||||
private static final Log LOG = LogFactory.getLog(HostFileManager.class);
|
private static final Log LOG = LogFactory.getLog(HostFileManager.class);
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public static boolean dnsResolutionDisabledForTesting = false;
|
|
||||||
|
|
||||||
public static class Entry {
|
public static class Entry {
|
||||||
/**
|
/**
|
||||||
* This what the user put on the line before the colon, or the whole line
|
* This what the user put on the line before the colon, or the whole line
|
||||||
@ -98,38 +88,6 @@ public class HostFileManager implements Closeable {
|
|||||||
* empty string.
|
* empty string.
|
||||||
*/
|
*/
|
||||||
private final String ipAddress;
|
private final String ipAddress;
|
||||||
|
|
||||||
public Entry(String prefix, int port, String ipAddress) {
|
|
||||||
this.prefix = prefix;
|
|
||||||
this.port = port;
|
|
||||||
this.ipAddress = ipAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getIdentifier() {
|
|
||||||
return ipAddress.isEmpty() ? prefix : ipAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
static Entry fromPrefixAndPort(String fileName, String prefix, int port) {
|
|
||||||
String ipAddress = "";
|
|
||||||
try {
|
|
||||||
if (dnsResolutionDisabledForTesting) {
|
|
||||||
throw new UnknownHostException("dns resolution disabled for " +
|
|
||||||
"testing");
|
|
||||||
}
|
|
||||||
// Let's see if we can resolve this prefix to an IP address.
|
|
||||||
// This may fail; one example is with a registered hostname
|
|
||||||
// which is not actually a real DNS name.
|
|
||||||
InetAddress addr = InetAddress.getByName(prefix);
|
|
||||||
ipAddress = addr.getHostAddress();
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
if (fileName != null) {
|
|
||||||
LOG.info("When reading " + fileName + ", could not look up " +
|
|
||||||
"IP address for " + prefix + ". We will assume this is a " +
|
|
||||||
"registration name.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new Entry(prefix, port, ipAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse a hosts file Entry.
|
* Parse a hosts file Entry.
|
||||||
@ -137,6 +95,7 @@ public class HostFileManager implements Closeable {
|
|||||||
static Entry parse(String fileName, String entry) throws IOException {
|
static Entry parse(String fileName, String entry) throws IOException {
|
||||||
final String prefix;
|
final String prefix;
|
||||||
final int port;
|
final int port;
|
||||||
|
String ipAddress = "";
|
||||||
|
|
||||||
int idx = entry.indexOf(':');
|
int idx = entry.indexOf(':');
|
||||||
if (-1 == idx) {
|
if (-1 == idx) {
|
||||||
@ -152,7 +111,28 @@ public class HostFileManager implements Closeable {
|
|||||||
"'" + entry + "'", e);
|
"'" + entry + "'", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Entry.fromPrefixAndPort(fileName, prefix, port);
|
try {
|
||||||
|
// Let's see if we can resolve this prefix to an IP address.
|
||||||
|
// This may fail; one example is with a registered hostname
|
||||||
|
// which is not actually a real DNS name.
|
||||||
|
InetAddress addr = InetAddress.getByName(prefix);
|
||||||
|
ipAddress = addr.getHostAddress();
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
LOG.info("When reading " + fileName + ", could not look up " +
|
||||||
|
"IP address for " + prefix + ". We will assume this is a " +
|
||||||
|
"registration name.", e);
|
||||||
|
}
|
||||||
|
return new Entry(prefix, port, ipAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getIdentifier() {
|
||||||
|
return ipAddress.isEmpty() ? prefix : ipAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Entry(String prefix, int port, String ipAddress) {
|
||||||
|
this.prefix = prefix;
|
||||||
|
this.port = port;
|
||||||
|
this.ipAddress = ipAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPrefix() {
|
public String getPrefix() {
|
||||||
@ -293,47 +273,7 @@ public class HostFileManager implements Closeable {
|
|||||||
private EntrySet includes = new EntrySet();
|
private EntrySet includes = new EntrySet();
|
||||||
private EntrySet excludes = new EntrySet();
|
private EntrySet excludes = new EntrySet();
|
||||||
|
|
||||||
private final ScheduledThreadPoolExecutor executor
|
public HostFileManager() {
|
||||||
= new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
|
|
||||||
setDaemon(true).setNameFormat("HostFileManagerDnsRefresh thread").
|
|
||||||
build());
|
|
||||||
|
|
||||||
private final ScheduledFuture<?> dnsResolverFuture;
|
|
||||||
|
|
||||||
private class DnsResolver implements Runnable {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
EntrySet oldIncludes, oldExcludes;
|
|
||||||
synchronized (HostFileManager.this) {
|
|
||||||
oldIncludes = includes;
|
|
||||||
oldExcludes = excludes;
|
|
||||||
}
|
|
||||||
MutableEntrySet newIncludes = new MutableEntrySet();
|
|
||||||
for (Entry e : oldIncludes) {
|
|
||||||
newIncludes.add(Entry.fromPrefixAndPort(null, e.prefix, e.port));
|
|
||||||
}
|
|
||||||
MutableEntrySet newExcludes = new MutableEntrySet();
|
|
||||||
for (Entry e : oldExcludes) {
|
|
||||||
newExcludes.add(Entry.fromPrefixAndPort(null, e.prefix, e.port));
|
|
||||||
}
|
|
||||||
synchronized (HostFileManager.this) {
|
|
||||||
// Don't replace an entry set that has already been replaced by
|
|
||||||
// refresh().
|
|
||||||
if (includes == oldIncludes) {
|
|
||||||
includes = newIncludes;
|
|
||||||
}
|
|
||||||
if (excludes == oldExcludes) {
|
|
||||||
excludes = newExcludes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public HostFileManager(int dnsResolutionSeconds) {
|
|
||||||
this.dnsResolverFuture = this.executor.
|
|
||||||
scheduleAtFixedRate(new DnsResolver(),
|
|
||||||
dnsResolutionSeconds, dnsResolutionSeconds,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void refresh(String includeFile, String excludeFile)
|
public void refresh(String includeFile, String excludeFile)
|
||||||
@ -415,9 +355,4 @@ public class HostFileManager implements Closeable {
|
|||||||
public synchronized EntrySet getExcludes() {
|
public synchronized EntrySet getExcludes() {
|
||||||
return excludes;
|
return excludes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void close() throws IOException {
|
|
||||||
dnsResolverFuture.cancel(false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -79,7 +78,6 @@ public class TestDecommission {
|
|||||||
Path dir = new Path(workingDir, System.getProperty("test.build.data", "target/test/data") + "/work-dir/decommission");
|
Path dir = new Path(workingDir, System.getProperty("test.build.data", "target/test/data") + "/work-dir/decommission");
|
||||||
hostsFile = new Path(dir, "hosts");
|
hostsFile = new Path(dir, "hosts");
|
||||||
excludeFile = new Path(dir, "exclude");
|
excludeFile = new Path(dir, "exclude");
|
||||||
HostFileManager.dnsResolutionDisabledForTesting = false;
|
|
||||||
|
|
||||||
// Setup conf
|
// Setup conf
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
|
||||||
@ -101,7 +99,6 @@ public class TestDecommission {
|
|||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
HostFileManager.dnsResolutionDisabledForTesting = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeConfigFile(Path name, ArrayList<String> nodes)
|
private void writeConfigFile(Path name, ArrayList<String> nodes)
|
||||||
@ -696,53 +693,4 @@ public class TestDecommission {
|
|||||||
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=360000)
|
|
||||||
public void testBackgroundDnsResolution() throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
// Create cluster
|
|
||||||
Configuration hdfsConf = new Configuration(conf);
|
|
||||||
hdfsConf.setInt(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_HOSTS_DNS_RESOLUTION_INTERVAL_SECONDS, 1);
|
|
||||||
cluster = new MiniDFSCluster.Builder(hdfsConf)
|
|
||||||
.numDataNodes(1).checkDataNodeHostConfig(true)
|
|
||||||
.setupHostsFile(true).build();
|
|
||||||
cluster.waitActive();
|
|
||||||
|
|
||||||
// Set up an includes file with just 127.0.0.1
|
|
||||||
ArrayList<String> nodes = new ArrayList<String>();
|
|
||||||
String localhost = java.net.InetAddress.getLocalHost().getHostName();
|
|
||||||
nodes.add(localhost);
|
|
||||||
writeConfigFile(hostsFile, nodes);
|
|
||||||
HostFileManager.dnsResolutionDisabledForTesting = true;
|
|
||||||
refreshNodes(cluster.getNamesystem(0), hdfsConf);
|
|
||||||
|
|
||||||
// The DN will be marked dead because DNS resolution was turned off,
|
|
||||||
// and we are in the include file by hostname only.
|
|
||||||
DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
|
|
||||||
while (true) {
|
|
||||||
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
|
|
||||||
if (info.length == 1) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
LOG.info("Waiting for datanode to be marked dead");
|
|
||||||
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow hostname resolution
|
|
||||||
HostFileManager.dnsResolutionDisabledForTesting = false;
|
|
||||||
cluster.restartDataNode(0);
|
|
||||||
|
|
||||||
// Wait for the DN to come back.
|
|
||||||
while (true) {
|
|
||||||
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
|
|
||||||
if (info.length == 1) {
|
|
||||||
Assert.assertFalse(info[0].isDecommissioned());
|
|
||||||
Assert.assertFalse(info[0].isDecommissionInProgress());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
LOG.info("Waiting for datanode to come back");
|
|
||||||
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user