HDFS-17546. Implementing HostsFileReader timeout (#6873)
This commit is contained in:
parent
2d5fa9e016
commit
2fbbfe3cc9
@ -33,6 +33,11 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.FutureTask;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@ -120,4 +125,37 @@ private CombinedHostsFileReader() {
|
|||||||
}
|
}
|
||||||
return allDNs;
|
return allDNs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper to call readFile with timeout via Future Tasks.
|
||||||
|
* If timeout is reached, it will throw IOException
|
||||||
|
* @param hostsFile the input json file to read from
|
||||||
|
* @param readTimeout timeout for FutureTask execution in milliseconds
|
||||||
|
* @return the set of DatanodeAdminProperties
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static DatanodeAdminProperties[]
|
||||||
|
readFileWithTimeout(final String hostsFile, final int readTimeout) throws IOException {
|
||||||
|
FutureTask<DatanodeAdminProperties[]> futureTask = new FutureTask<>(
|
||||||
|
new Callable<DatanodeAdminProperties[]>() {
|
||||||
|
@Override
|
||||||
|
public DatanodeAdminProperties[] call() throws Exception {
|
||||||
|
return readFile(hostsFile);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Thread thread = new Thread(futureTask);
|
||||||
|
thread.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
return futureTask.get(readTimeout, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
futureTask.cancel(true);
|
||||||
|
LOG.error("refresh File read operation timed out");
|
||||||
|
throw new IOException("host file read operation timed out");
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
LOG.error("File read operation interrupted : " + e.getMessage());
|
||||||
|
throw new IOException("host file read operation timed out");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -757,6 +757,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
"dfs.namenode.hosts.provider.classname";
|
"dfs.namenode.hosts.provider.classname";
|
||||||
public static final String DFS_HOSTS = "dfs.hosts";
|
public static final String DFS_HOSTS = "dfs.hosts";
|
||||||
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
|
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
|
||||||
|
public static final String DFS_HOSTS_TIMEOUT = "dfs.hosts.timeout";
|
||||||
|
public static final int DFS_HOSTS_TIMEOUT_DEFAULT = 0;
|
||||||
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
|
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
|
||||||
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
|
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
|
||||||
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
|
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
|
||||||
|
@ -179,12 +179,15 @@ public Configuration getConf() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refresh() throws IOException {
|
public void refresh() throws IOException {
|
||||||
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""));
|
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||||
|
conf.getInt(DFSConfigKeys.DFS_HOSTS_TIMEOUT, DFSConfigKeys.DFS_HOSTS_TIMEOUT_DEFAULT)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
private void refresh(final String hostsFile) throws IOException {
|
private void refresh(final String hostsFile, final int readTimeout) throws IOException {
|
||||||
HostProperties hostProps = new HostProperties();
|
HostProperties hostProps = new HostProperties();
|
||||||
DatanodeAdminProperties[] all =
|
DatanodeAdminProperties[] all = readTimeout != DFSConfigKeys.DFS_HOSTS_TIMEOUT_DEFAULT
|
||||||
CombinedHostsFileReader.readFile(hostsFile);
|
? CombinedHostsFileReader.readFileWithTimeout(hostsFile, readTimeout)
|
||||||
|
: CombinedHostsFileReader.readFile(hostsFile);
|
||||||
for(DatanodeAdminProperties properties : all) {
|
for(DatanodeAdminProperties properties : all) {
|
||||||
InetSocketAddress addr = parseEntry(hostsFile,
|
InetSocketAddress addr = parseEntry(hostsFile,
|
||||||
properties.getHostName(), properties.getPort());
|
properties.getHostName(), properties.getPort());
|
||||||
|
@ -1133,6 +1133,13 @@
|
|||||||
excluded.</description>
|
excluded.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.hosts.timeout</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>Specifies a timeout (in milliseconds) for reading the dfs.hosts file.
|
||||||
|
A value of zero indicates no timeout to be set.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.max.objects</name>
|
<name>dfs.namenode.max.objects</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
|
@ -19,14 +19,21 @@
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
|
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for JSON based HostsFileReader.
|
* Test for JSON based HostsFileReader.
|
||||||
@ -44,8 +51,12 @@ public class TestCombinedHostsFileReader {
|
|||||||
private final File legacyFile =
|
private final File legacyFile =
|
||||||
new File(TESTCACHEDATADIR, "legacy.dfs.hosts.json");
|
new File(TESTCACHEDATADIR, "legacy.dfs.hosts.json");
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private Callable<DatanodeAdminProperties[]> callable;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
callable = Mockito.mock(Callable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -87,4 +98,50 @@ public void testEmptyCombinedHostsFileReader() throws Exception {
|
|||||||
CombinedHostsFileReader.readFile(newFile.getAbsolutePath());
|
CombinedHostsFileReader.readFile(newFile.getAbsolutePath());
|
||||||
assertEquals(0, all.length);
|
assertEquals(0, all.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When timeout is enabled, test for success when reading file within timeout
|
||||||
|
* limits
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testReadFileWithTimeoutSuccess() throws Exception {
|
||||||
|
|
||||||
|
DatanodeAdminProperties[] all = CombinedHostsFileReader.readFileWithTimeout(
|
||||||
|
jsonFile.getAbsolutePath(), 1000);
|
||||||
|
assertEquals(7, all.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When timeout is enabled, test for IOException when reading file exceeds
|
||||||
|
* timeout limits
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testReadFileWithTimeoutTimeoutException() throws Exception {
|
||||||
|
when(callable.call()).thenAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Thread.sleep(2000);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CombinedHostsFileReader.readFileWithTimeout(
|
||||||
|
jsonFile.getAbsolutePath(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When timeout is enabled, test for IOException when execution is interrupted
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testReadFileWithTimeoutInterruptedException() throws Exception {
|
||||||
|
when(callable.call()).thenAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
throw new InterruptedException();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CombinedHostsFileReader.readFileWithTimeout(
|
||||||
|
jsonFile.getAbsolutePath(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user