HDFS-6061. Allow dfs.datanode.shared.file.descriptor.path to contain multiple entries and fall back when needed (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1574796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9192f8446d
commit
5a3f614794
@ -22,11 +22,11 @@
|
||||
import java.io.FileDescriptor;
|
||||
|
||||
import org.apache.commons.lang.SystemUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* A factory for creating shared file descriptors inside a given directory.
|
||||
* Typically, the directory will be /dev/shm or /tmp.
|
||||
@ -45,6 +45,7 @@
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class SharedFileDescriptorFactory {
|
||||
public static final Log LOG = LogFactory.getLog(SharedFileDescriptorFactory.class);
|
||||
private final String prefix;
|
||||
private final String path;
|
||||
|
||||
@ -58,18 +59,58 @@ public static String getLoadingFailureReason() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new SharedFileDescriptorFactory.
|
||||
*
|
||||
* @param prefix The prefix to prepend to all the file names created
|
||||
* by this factory.
|
||||
* @param paths An array of paths to use. We will try each path in
|
||||
* succession, and return a factory using the first
|
||||
* usable path.
|
||||
* @return The factory.
|
||||
* @throws IOException If a factory could not be created for any reason.
|
||||
*/
|
||||
public static SharedFileDescriptorFactory create(String prefix,
|
||||
String paths[]) throws IOException {
|
||||
String loadingFailureReason = getLoadingFailureReason();
|
||||
if (loadingFailureReason != null) {
|
||||
throw new IOException(loadingFailureReason);
|
||||
}
|
||||
if (paths.length == 0) {
|
||||
throw new IOException("no SharedFileDescriptorFactory paths were " +
|
||||
"configured.");
|
||||
}
|
||||
StringBuilder errors = new StringBuilder();
|
||||
String strPrefix = "";
|
||||
for (String path : paths) {
|
||||
try {
|
||||
FileInputStream fis =
|
||||
new FileInputStream(createDescriptor0(prefix + "test", path, 1));
|
||||
fis.close();
|
||||
deleteStaleTemporaryFiles0(prefix, path);
|
||||
return new SharedFileDescriptorFactory(prefix, path);
|
||||
} catch (IOException e) {
|
||||
errors.append(strPrefix).append("Error creating file descriptor in ").
|
||||
append(path).append(": ").append(e.getMessage());
|
||||
strPrefix = ", ";
|
||||
}
|
||||
}
|
||||
throw new IOException(errors.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a SharedFileDescriptorFactory.
|
||||
*
|
||||
* @param prefix Prefix to add to all file names we use.
|
||||
* @param path Path to use.
|
||||
*/
|
||||
public SharedFileDescriptorFactory(String prefix, String path)
|
||||
throws IOException {
|
||||
Preconditions.checkState(getLoadingFailureReason() == null);
|
||||
private SharedFileDescriptorFactory(String prefix, String path) {
|
||||
this.prefix = prefix;
|
||||
this.path = path;
|
||||
deleteStaleTemporaryFiles0(prefix, path);
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,9 +20,11 @@
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.apache.commons.lang.SystemUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -36,14 +38,19 @@ public class TestSharedFileDescriptorFactory {
|
||||
private static final File TEST_BASE =
|
||||
new File(System.getProperty("test.build.data", "/tmp"));
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
Assume.assumeTrue(null ==
|
||||
SharedFileDescriptorFactory.getLoadingFailureReason());
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testReadAndWrite() throws Exception {
|
||||
Assume.assumeTrue(NativeIO.isAvailable());
|
||||
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
||||
File path = new File(TEST_BASE, "testReadAndWrite");
|
||||
path.mkdirs();
|
||||
SharedFileDescriptorFactory factory =
|
||||
new SharedFileDescriptorFactory("woot_", path.getAbsolutePath());
|
||||
SharedFileDescriptorFactory.create("woot_",
|
||||
new String[] { path.getAbsolutePath() });
|
||||
FileInputStream inStream =
|
||||
factory.createDescriptor("testReadAndWrite", 4096);
|
||||
FileOutputStream outStream = new FileOutputStream(inStream.getFD());
|
||||
@ -73,11 +80,34 @@ public void testCleanupRemainders() throws Exception {
|
||||
Path.SEPARATOR + "woot2_remainder2";
|
||||
createTempFile(remainder1);
|
||||
createTempFile(remainder2);
|
||||
new SharedFileDescriptorFactory("woot2_", path.getAbsolutePath());
|
||||
SharedFileDescriptorFactory.create("woot2_",
|
||||
new String[] { path.getAbsolutePath() });
|
||||
// creating the SharedFileDescriptorFactory should have removed
|
||||
// the remainders
|
||||
Assert.assertFalse(new File(remainder1).exists());
|
||||
Assert.assertFalse(new File(remainder2).exists());
|
||||
FileUtil.fullyDelete(path);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testDirectoryFallbacks() throws Exception {
|
||||
File nonExistentPath = new File(TEST_BASE, "nonexistent");
|
||||
File permissionDeniedPath = new File("/");
|
||||
File goodPath = new File(TEST_BASE, "testDirectoryFallbacks");
|
||||
goodPath.mkdirs();
|
||||
try {
|
||||
SharedFileDescriptorFactory.create("shm_",
|
||||
new String[] { nonExistentPath.getAbsolutePath(),
|
||||
permissionDeniedPath.getAbsolutePath() });
|
||||
Assert.fail();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
SharedFileDescriptorFactory factory =
|
||||
SharedFileDescriptorFactory.create("shm_",
|
||||
new String[] { nonExistentPath.getAbsolutePath(),
|
||||
permissionDeniedPath.getAbsolutePath(),
|
||||
goodPath.getAbsolutePath() } );
|
||||
Assert.assertEquals(goodPath.getAbsolutePath(), factory.getPath());
|
||||
FileUtil.fullyDelete(goodPath);
|
||||
}
|
||||
}
|
||||
|
@ -527,6 +527,9 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-6044. Add property for setting the NFS look up time for users
|
||||
(brandonli)
|
||||
|
||||
HDFS-6061. Allow dfs.datanode.shared.file.descriptor.path to contain
|
||||
multiple entries and fall back when needed (cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||
|
@ -477,8 +477,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup";
|
||||
public static final String DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file";
|
||||
public static final String DFS_DATANODE_USER_NAME_KEY = "dfs.datanode.kerberos.principal";
|
||||
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH = "dfs.datanode.shared.file.descriptor.path";
|
||||
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT = "/dev/shm";
|
||||
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths";
|
||||
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT = "/dev/shm,/tmp";
|
||||
public static final String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
|
||||
public static final int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000;
|
||||
public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
|
||||
|
@ -17,14 +17,15 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
@ -45,7 +46,9 @@
|
||||
import org.apache.hadoop.net.unix.DomainSocketWatcher;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
/*
|
||||
* Manages client short-circuit memory segments on the DataNode.
|
||||
@ -149,38 +152,35 @@ public ShortCircuitRegistry(Configuration conf) throws IOException {
|
||||
SharedFileDescriptorFactory shmFactory = null;
|
||||
DomainSocketWatcher watcher = null;
|
||||
try {
|
||||
String loadingFailureReason =
|
||||
SharedFileDescriptorFactory.getLoadingFailureReason();
|
||||
if (loadingFailureReason != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Disabling ShortCircuitRegistry because " +
|
||||
loadingFailureReason);
|
||||
}
|
||||
return;
|
||||
}
|
||||
String shmPath = conf.get(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH,
|
||||
DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT);
|
||||
if (shmPath.isEmpty()) {
|
||||
LOG.debug("Disabling ShortCircuitRegistry because shmPath was not set.");
|
||||
return;
|
||||
}
|
||||
int interruptCheck = conf.getInt(
|
||||
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
||||
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
|
||||
if (interruptCheck <= 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Disabling ShortCircuitRegistry because " +
|
||||
"interruptCheckMs was set to " + interruptCheck);
|
||||
}
|
||||
return;
|
||||
throw new IOException(
|
||||
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS +
|
||||
" was set to " + interruptCheck);
|
||||
}
|
||||
String shmPaths[] =
|
||||
conf.getTrimmedStrings(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS);
|
||||
if (shmPaths.length == 0) {
|
||||
shmPaths =
|
||||
DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT.split(",");
|
||||
}
|
||||
shmFactory = SharedFileDescriptorFactory.
|
||||
create("HadoopShortCircuitShm_", shmPaths);
|
||||
String dswLoadingFailure = DomainSocketWatcher.getLoadingFailureReason();
|
||||
if (dswLoadingFailure != null) {
|
||||
throw new IOException(dswLoadingFailure);
|
||||
}
|
||||
shmFactory =
|
||||
new SharedFileDescriptorFactory("HadoopShortCircuitShm_", shmPath);
|
||||
watcher = new DomainSocketWatcher(interruptCheck);
|
||||
enabled = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("created new ShortCircuitRegistry with interruptCheck=" +
|
||||
interruptCheck + ", shmPath=" + shmPath);
|
||||
interruptCheck + ", shmPath=" + shmFactory.getPath());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Disabling ShortCircuitRegistry", e);
|
||||
}
|
||||
} finally {
|
||||
this.enabled = enabled;
|
||||
|
@ -1150,13 +1150,13 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.shared.file.descriptor.path</name>
|
||||
<value>/dev/shm</value>
|
||||
<name>dfs.datanode.shared.file.descriptor.paths</name>
|
||||
<value>/dev/shm,/tmp</value>
|
||||
<description>
|
||||
The path to use when creating file descriptors that will be shared
|
||||
between the DataNode and the DFSClient. Typically we use /dev/shm, so
|
||||
that the file descriptors will not be written to disk. Systems that
|
||||
don't have /dev/shm should use /tmp.
|
||||
A comma-separated list of paths to use when creating file descriptors that
|
||||
will be shared between the DataNode and the DFSClient. Typically we use
|
||||
/dev/shm, so that the file descriptors will not be written to disk.
|
||||
Systems that don't have /dev/shm will fall back to /tmp by default.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -22,11 +22,9 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.lang.SystemUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.ShortCircuitShm;
|
||||
@ -45,8 +43,8 @@ public class TestShortCircuitShm {
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
Assume.assumeTrue(NativeIO.isAvailable());
|
||||
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
||||
Assume.assumeTrue(null ==
|
||||
SharedFileDescriptorFactory.getLoadingFailureReason());
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
@ -54,7 +52,8 @@ public void testStartupShutdown() throws Exception {
|
||||
File path = new File(TEST_BASE, "testStartupShutdown");
|
||||
path.mkdirs();
|
||||
SharedFileDescriptorFactory factory =
|
||||
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
|
||||
SharedFileDescriptorFactory.create("shm_",
|
||||
new String[] { path.getAbsolutePath() } );
|
||||
FileInputStream stream =
|
||||
factory.createDescriptor("testStartupShutdown", 4096);
|
||||
ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
|
||||
@ -68,7 +67,8 @@ public void testAllocateSlots() throws Exception {
|
||||
File path = new File(TEST_BASE, "testAllocateSlots");
|
||||
path.mkdirs();
|
||||
SharedFileDescriptorFactory factory =
|
||||
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
|
||||
SharedFileDescriptorFactory.create("shm_",
|
||||
new String[] { path.getAbsolutePath() });
|
||||
FileInputStream stream =
|
||||
factory.createDescriptor("testAllocateSlots", 4096);
|
||||
ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
|
||||
|
Loading…
Reference in New Issue
Block a user