HDFS-5157. Add StorageType to FsVolume. Contributed by Junping Du
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1521743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
23bf148a2f
commit
b2976af140
@ -21,3 +21,5 @@ IMPROVEMENTS:
|
|||||||
HDFS-5134. Move blockContentsStale, heartbeatedSinceFailover and
|
HDFS-5134. Move blockContentsStale, heartbeatedSinceFailover and
|
||||||
firstBlockReport from DatanodeDescriptor to DatanodeStorageInfo; and
|
firstBlockReport from DatanodeDescriptor to DatanodeStorageInfo; and
|
||||||
fix a synchronization problem in DatanodeStorageInfo. (szetszwo)
|
fix a synchronization problem in DatanodeStorageInfo. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-5157. Add StorageType to FsVolume. (Junping Du via szetszwo)
|
||||||
|
@ -52,7 +52,6 @@
|
|||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
@ -62,7 +61,6 @@
|
|||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
@ -122,7 +120,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
@ -160,7 +157,6 @@
|
|||||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||||
import org.apache.hadoop.util.ServicePlugin;
|
import org.apache.hadoop.util.ServicePlugin;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
@ -1709,7 +1705,8 @@ static Collection<StorageLocation> parseStorageLocations(
|
|||||||
return locations;
|
return locations;
|
||||||
}
|
}
|
||||||
|
|
||||||
static Collection<StorageLocation> getStorageLocations(Configuration conf) {
|
public static Collection<StorageLocation> getStorageLocations(
|
||||||
|
Configuration conf) {
|
||||||
return parseStorageLocations(
|
return parseStorageLocations(
|
||||||
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY));
|
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY));
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is an interface for the underlying volume.
|
* This is an interface for the underlying volume.
|
||||||
*/
|
*/
|
||||||
@ -38,4 +40,6 @@ public interface FsVolumeSpi {
|
|||||||
|
|
||||||
/** @return the directory for the finalized blocks in the block pool. */
|
/** @return the directory for the finalized blocks in the block pool. */
|
||||||
public File getFinalizedDir(String bpid) throws IOException;
|
public File getFinalizedDir(String bpid) throws IOException;
|
||||||
|
|
||||||
|
public StorageType getStorageType();
|
||||||
}
|
}
|
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
@ -65,6 +66,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||||
@ -188,6 +190,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
|||||||
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
|
||||||
|
|
||||||
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
||||||
|
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
|
||||||
|
|
||||||
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
|
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
|
||||||
int volsFailed = volsConfigured - storage.getNumStorageDirs();
|
int volsFailed = volsConfigured - storage.getNumStorageDirs();
|
||||||
@ -209,8 +212,13 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
|||||||
storage.getNumStorageDirs());
|
storage.getNumStorageDirs());
|
||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||||
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
||||||
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
|
// TODO: getStorageTypeFromLocations() is only a temporary workaround and
|
||||||
LOG.info("Added volume - " + dir);
|
// should be replaced with getting storage type from DataStorage (missing
|
||||||
|
// storage type now) directly.
|
||||||
|
final StorageType storageType = getStorageTypeFromLocations(dataLocations, dir);
|
||||||
|
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf,
|
||||||
|
storageType));
|
||||||
|
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||||
}
|
}
|
||||||
volumeMap = new ReplicaMap(this);
|
volumeMap = new ReplicaMap(this);
|
||||||
|
|
||||||
@ -231,6 +239,16 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
|||||||
registerMBean(storage.getStorageID());
|
registerMBean(storage.getStorageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StorageType getStorageTypeFromLocations(
|
||||||
|
Collection<StorageLocation> dataLocations, File dir) {
|
||||||
|
for (StorageLocation dataLocation : dataLocations) {
|
||||||
|
if (dataLocation.getFile().equals(dir)) {
|
||||||
|
return dataLocation.getStorageType();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return StorageType.DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the total space used by dfs datanode
|
* Return the total space used by dfs datanode
|
||||||
*/
|
*/
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import org.apache.hadoop.fs.DF;
|
import org.apache.hadoop.fs.DF;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
@ -43,6 +44,7 @@
|
|||||||
class FsVolumeImpl implements FsVolumeSpi {
|
class FsVolumeImpl implements FsVolumeSpi {
|
||||||
private final FsDatasetImpl dataset;
|
private final FsDatasetImpl dataset;
|
||||||
private final String storageID;
|
private final String storageID;
|
||||||
|
private final StorageType storageType;
|
||||||
private final Map<String, BlockPoolSlice> bpSlices
|
private final Map<String, BlockPoolSlice> bpSlices
|
||||||
= new HashMap<String, BlockPoolSlice>();
|
= new HashMap<String, BlockPoolSlice>();
|
||||||
private final File currentDir; // <StorageDirectory>/current
|
private final File currentDir; // <StorageDirectory>/current
|
||||||
@ -50,7 +52,7 @@ class FsVolumeImpl implements FsVolumeSpi {
|
|||||||
private final long reserved;
|
private final long reserved;
|
||||||
|
|
||||||
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
|
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf, StorageType storageType) throws IOException {
|
||||||
this.dataset = dataset;
|
this.dataset = dataset;
|
||||||
this.storageID = storageID;
|
this.storageID = storageID;
|
||||||
this.reserved = conf.getLong(
|
this.reserved = conf.getLong(
|
||||||
@ -59,6 +61,7 @@ class FsVolumeImpl implements FsVolumeSpi {
|
|||||||
this.currentDir = currentDir;
|
this.currentDir = currentDir;
|
||||||
File parent = currentDir.getParentFile();
|
File parent = currentDir.getParentFile();
|
||||||
this.usage = new DF(parent, conf);
|
this.usage = new DF(parent, conf);
|
||||||
|
this.storageType = storageType;
|
||||||
}
|
}
|
||||||
|
|
||||||
File getCurrentDir() {
|
File getCurrentDir() {
|
||||||
@ -290,4 +293,9 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException {
|
|||||||
String getStorageID() {
|
String getStorageID() {
|
||||||
return storageID;
|
return storageID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StorageType getStorageType() {
|
||||||
|
return storageType;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,7 @@ int numberOfFailedVolumes() {
|
|||||||
* @param blockSize free space needed on the volume
|
* @param blockSize free space needed on the volume
|
||||||
* @return next volume to store the block in.
|
* @return next volume to store the block in.
|
||||||
*/
|
*/
|
||||||
|
// TODO this will be replaced by getting volume from StorageID directly later.
|
||||||
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
|
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
|
||||||
return blockChooser.chooseVolume(volumes, blockSize);
|
return blockChooser.chooseVolume(volumes, blockSize);
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
@ -406,6 +407,11 @@ public String getPath(String bpid) throws IOException {
|
|||||||
public File getFinalizedDir(String bpid) throws IOException {
|
public File getFinalizedDir(String bpid) throws IOException {
|
||||||
return new File("/base/current/" + bpid + "/finalized");
|
return new File("/base/current/" + bpid + "/finalized");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StorageType getStorageType() {
|
||||||
|
return StorageType.DEFAULT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
|
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
|
||||||
|
Loading…
Reference in New Issue
Block a user