HDFS-5000. DataNode configuration should allow specifying storage type
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1517417 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c41df254fe
commit
73d14311bc
@ -460,9 +460,9 @@ void start() {
|
||||
}
|
||||
|
||||
private String formatThreadName() {
|
||||
Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
|
||||
return "DataNode: [" +
|
||||
StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
|
||||
Collection<StorageLocation> dataDirs =
|
||||
DataNode.getStorageLocations(dn.getConf());
|
||||
return "DataNode: [" + dataDirs.toString() + "] " +
|
||||
" heartbeating to " + nnAddr;
|
||||
}
|
||||
|
||||
|
@ -272,7 +272,7 @@ public static InetSocketAddress createSocketAddr(String target) {
|
||||
private JvmPauseMonitor pauseMonitor;
|
||||
|
||||
private SecureResources secureResources = null;
|
||||
private AbstractList<File> dataDirs;
|
||||
private AbstractList<StorageLocation> dataDirs;
|
||||
private Configuration conf;
|
||||
|
||||
private final List<String> usersWithLocalPathAccess;
|
||||
@ -280,21 +280,12 @@ public static InetSocketAddress createSocketAddr(String target) {
|
||||
ReadaheadPool readaheadPool;
|
||||
private final boolean getHdfsBlockLocationsEnabled;
|
||||
|
||||
/**
|
||||
* Create the DataNode given a configuration and an array of dataDirs.
|
||||
* 'dataDirs' is where the blocks are stored.
|
||||
*/
|
||||
DataNode(final Configuration conf,
|
||||
final AbstractList<File> dataDirs) throws IOException {
|
||||
this(conf, dataDirs, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the DataNode given a configuration, an array of dataDirs,
|
||||
* and a namenode proxy
|
||||
*/
|
||||
DataNode(final Configuration conf,
|
||||
final AbstractList<File> dataDirs,
|
||||
DataNode(final Configuration conf,
|
||||
final AbstractList<StorageLocation> dataDirs,
|
||||
final SecureResources resources) throws IOException {
|
||||
super(conf);
|
||||
|
||||
@ -711,7 +702,7 @@ boolean areHeartbeatsDisabledForTests() {
|
||||
* @throws IOException
|
||||
*/
|
||||
void startDataNode(Configuration conf,
|
||||
AbstractList<File> dataDirs,
|
||||
AbstractList<StorageLocation> dataDirs,
|
||||
// DatanodeProtocol namenode,
|
||||
SecureResources resources
|
||||
) throws IOException {
|
||||
@ -861,7 +852,7 @@ void shutdownBlockPool(BPOfferService bpos) {
|
||||
* If this is the first block pool to register, this also initializes
|
||||
* the datanode-scoped storage.
|
||||
*
|
||||
* @param nsInfo the handshake response from the NN.
|
||||
* @param bpos block pool to initialize and register with the NameNode.
|
||||
* @throws IOException if the NN is inconsistent with the local storage.
|
||||
*/
|
||||
void initBlockPool(BPOfferService bpos) throws IOException {
|
||||
@ -1688,17 +1679,39 @@ public static DataNode instantiateDataNode(String args [], Configuration conf,
|
||||
printUsage(System.err);
|
||||
return null;
|
||||
}
|
||||
Collection<URI> dataDirs = getStorageDirs(conf);
|
||||
Collection<StorageLocation> dataLocations = getStorageLocations(conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
|
||||
DFS_DATANODE_USER_NAME_KEY);
|
||||
return makeInstance(dataDirs, conf, resources);
|
||||
return makeInstance(dataLocations, conf, resources);
|
||||
}
|
||||
|
||||
static Collection<URI> getStorageDirs(Configuration conf) {
|
||||
Collection<String> dirNames =
|
||||
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
|
||||
return Util.stringCollectionAsURIs(dirNames);
|
||||
static Collection<StorageLocation> parseStorageLocations(
|
||||
Collection<String> rawLocations) {
|
||||
List<StorageLocation> locations =
|
||||
new ArrayList<StorageLocation>(rawLocations.size());
|
||||
|
||||
for(String locationString : rawLocations) {
|
||||
StorageLocation location;
|
||||
try {
|
||||
location = StorageLocation.parse(locationString);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Failed to parse storage location " + locationString);
|
||||
continue;
|
||||
} catch (IllegalArgumentException iae) {
|
||||
LOG.error(iae.toString());
|
||||
continue;
|
||||
}
|
||||
|
||||
locations.add(location);
|
||||
}
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
static Collection<StorageLocation> getStorageLocations(Configuration conf) {
|
||||
return parseStorageLocations(
|
||||
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY));
|
||||
}
|
||||
|
||||
/** Instantiate & Start a single datanode daemon and wait for it to finish.
|
||||
@ -1764,51 +1777,45 @@ public void checkDir(LocalFileSystem localFS, Path path)
|
||||
* no directory from this directory list can be created.
|
||||
* @throws IOException
|
||||
*/
|
||||
static DataNode makeInstance(Collection<URI> dataDirs, Configuration conf,
|
||||
SecureResources resources) throws IOException {
|
||||
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
|
||||
Configuration conf, SecureResources resources) throws IOException {
|
||||
LocalFileSystem localFS = FileSystem.getLocal(conf);
|
||||
FsPermission permission = new FsPermission(
|
||||
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
|
||||
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
|
||||
DataNodeDiskChecker dataNodeDiskChecker =
|
||||
new DataNodeDiskChecker(permission);
|
||||
ArrayList<File> dirs =
|
||||
getDataDirsFromURIs(dataDirs, localFS, dataNodeDiskChecker);
|
||||
ArrayList<StorageLocation> locations =
|
||||
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
|
||||
DefaultMetricsSystem.initialize("DataNode");
|
||||
|
||||
assert dirs.size() > 0 : "number of data directories should be > 0";
|
||||
return new DataNode(conf, dirs, resources);
|
||||
assert locations.size() > 0 : "number of data directories should be > 0";
|
||||
return new DataNode(conf, locations, resources);
|
||||
}
|
||||
|
||||
// DataNode ctor expects AbstractList instead of List or Collection...
|
||||
static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs,
|
||||
static ArrayList<StorageLocation> checkStorageLocations(
|
||||
Collection<StorageLocation> dataDirs,
|
||||
LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
|
||||
throws IOException {
|
||||
ArrayList<File> dirs = new ArrayList<File>();
|
||||
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
|
||||
StringBuilder invalidDirs = new StringBuilder();
|
||||
for (URI dirURI : dataDirs) {
|
||||
if (!"file".equalsIgnoreCase(dirURI.getScheme())) {
|
||||
LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ...");
|
||||
invalidDirs.append("\"").append(dirURI).append("\" ");
|
||||
continue;
|
||||
}
|
||||
// drop any (illegal) authority in the URI for backwards compatibility
|
||||
File dir = new File(dirURI.getPath());
|
||||
for (StorageLocation location : dataDirs) {
|
||||
try {
|
||||
dataNodeDiskChecker.checkDir(localFS, new Path(dir.toURI()));
|
||||
dirs.add(dir);
|
||||
dataNodeDiskChecker.checkDir(localFS, new Path(location.getUri()));
|
||||
locations.add(location);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
|
||||
+ dir + " : ", ioe);
|
||||
invalidDirs.append("\"").append(dir.getCanonicalPath()).append("\" ");
|
||||
+ location.getFile() + " : ", ioe);
|
||||
invalidDirs.append("\"").append(location.getFile().getCanonicalPath()).append("\" ");
|
||||
}
|
||||
}
|
||||
if (dirs.size() == 0) {
|
||||
if (locations.size() == 0) {
|
||||
throw new IOException("All directories in "
|
||||
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
|
||||
+ invalidDirs);
|
||||
}
|
||||
return dirs;
|
||||
return locations;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -129,7 +129,8 @@ public synchronized void createStorageID() {
|
||||
* @throws IOException
|
||||
*/
|
||||
synchronized void recoverTransitionRead(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt)
|
||||
throws IOException {
|
||||
if (initialized) {
|
||||
// DN storage has been initialized, no need to do anything
|
||||
@ -145,8 +146,8 @@ synchronized void recoverTransitionRead(DataNode datanode,
|
||||
// Format and recover.
|
||||
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
||||
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
|
||||
for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
|
||||
File dataDir = it.next();
|
||||
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
|
||||
File dataDir = it.next().getFile();
|
||||
StorageDirectory sd = new StorageDirectory(dataDir);
|
||||
StorageState curState;
|
||||
try {
|
||||
@ -215,14 +216,14 @@ synchronized void recoverTransitionRead(DataNode datanode,
|
||||
* @throws IOException on error
|
||||
*/
|
||||
void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
|
||||
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
|
||||
Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
|
||||
// First ensure datanode level format/snapshot/rollback is completed
|
||||
recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
|
||||
|
||||
|
||||
// Create list of storage directories for the block pool
|
||||
Collection<File> bpDataDirs = new ArrayList<File>();
|
||||
for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
|
||||
File dnRoot = it.next();
|
||||
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
|
||||
File dnRoot = it.next().getFile();
|
||||
File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
|
||||
STORAGE_DIR_CURRENT));
|
||||
bpDataDirs.add(bpRoot);
|
||||
|
@ -35,6 +35,8 @@
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@ -44,18 +46,9 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.*;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -80,10 +73,7 @@
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.*;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
@ -121,7 +111,7 @@ public class TestBlockRecovery {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Before
|
||||
public void startUp() throws IOException {
|
||||
public void startUp() throws IOException, URISyntaxException {
|
||||
tearDownDone = false;
|
||||
conf = new HdfsConfiguration();
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
|
||||
@ -131,11 +121,12 @@ public void startUp() throws IOException {
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||
FileSystem.setDefaultUri(conf,
|
||||
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
|
||||
ArrayList<File> dirs = new ArrayList<File>();
|
||||
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
|
||||
File dataDir = new File(DATA_DIR);
|
||||
FileUtil.fullyDelete(dataDir);
|
||||
dataDir.mkdirs();
|
||||
dirs.add(dataDir);
|
||||
StorageLocation location = new StorageLocation(new URI(dataDir.getPath()));
|
||||
locations.add(location);
|
||||
final DatanodeProtocolClientSideTranslatorPB namenode =
|
||||
mock(DatanodeProtocolClientSideTranslatorPB.class);
|
||||
|
||||
@ -161,7 +152,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
|
||||
new DatanodeCommand[0],
|
||||
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1)));
|
||||
|
||||
dn = new DataNode(conf, dirs, null) {
|
||||
dn = new DataNode(conf, locations, null) {
|
||||
@Override
|
||||
DatanodeProtocolClientSideTranslatorPB connectToNN(
|
||||
InetSocketAddress nnAddr) throws IOException {
|
||||
|
@ -20,11 +20,14 @@
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ -34,19 +37,71 @@
|
||||
|
||||
public class TestDataDirs {
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testGetDataDirsFromURIs() throws Throwable {
|
||||
@Test (timeout = 30000)
|
||||
public void testDataDirParsing() throws Throwable {
|
||||
Configuration conf = new Configuration();
|
||||
ArrayList<StorageLocation> locations;
|
||||
File dir0 = new File("/dir0");
|
||||
File dir1 = new File("/dir1");
|
||||
File dir2 = new File("/dir2");
|
||||
File dir3 = new File("/dir3");
|
||||
|
||||
// Verify that a valid string is correctly parsed, and that storage
|
||||
// type is not case-sensitive
|
||||
String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3";
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations1);
|
||||
locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf));
|
||||
assertThat(locations.size(), is(4));
|
||||
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
|
||||
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
|
||||
assertThat(locations.get(2).getStorageType(), is(StorageType.SSD));
|
||||
assertThat(locations.get(2).getUri(), is(dir2.toURI()));
|
||||
assertThat(locations.get(3).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(3).getUri(), is(dir3.toURI()));
|
||||
|
||||
// Verify that an unrecognized storage type is ignored.
|
||||
String locations2 = "[BadMediaType]/dir0,[ssd]/dir1,[disk]/dir2";
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations2);
|
||||
locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf));
|
||||
assertThat(locations.size(), is(3));
|
||||
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
|
||||
assertThat(locations.get(1).getStorageType(), is(StorageType.SSD));
|
||||
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
|
||||
assertThat(locations.get(2).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(2).getUri(), is(dir2.toURI()));
|
||||
|
||||
// Assert that a string with no storage type specified is
|
||||
// correctly parsed and the default storage type is picked up.
|
||||
String locations3 = "/dir0,/dir1";
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations3);
|
||||
locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf));
|
||||
assertThat(locations.size(), is(2));
|
||||
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
|
||||
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
|
||||
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testDataDirValidation() throws Throwable {
|
||||
|
||||
DataNodeDiskChecker diskChecker = mock(DataNodeDiskChecker.class);
|
||||
doThrow(new IOException()).doThrow(new IOException()).doNothing()
|
||||
.when(diskChecker).checkDir(any(LocalFileSystem.class), any(Path.class));
|
||||
LocalFileSystem fs = mock(LocalFileSystem.class);
|
||||
Collection<URI> uris = Arrays.asList(new URI("file:/p1/"),
|
||||
new URI("file:/p2/"), new URI("file:/p3/"));
|
||||
AbstractList<StorageLocation> locations = new ArrayList<StorageLocation>();
|
||||
|
||||
List<File> dirs = DataNode.getDataDirsFromURIs(uris, fs, diskChecker);
|
||||
assertEquals("number of valid data dirs", 1, dirs.size());
|
||||
String validDir = dirs.iterator().next().getPath();
|
||||
assertEquals("p3 should be valid", new File("/p3").getPath(), validDir);
|
||||
locations.add(new StorageLocation(new URI("file:/p1/")));
|
||||
locations.add(new StorageLocation(new URI("file:/p2/")));
|
||||
locations.add(new StorageLocation(new URI("file:/p3/")));
|
||||
|
||||
ArrayList<StorageLocation> checkedLocations =
|
||||
DataNode.checkStorageLocations(locations, fs, diskChecker);
|
||||
assertEquals("number of valid data dirs", 1, checkedLocations.size());
|
||||
String validDir = checkedLocations.iterator().next().getFile().getPath();
|
||||
assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user