HDFS-2907. Add a conf property dfs.datanode.fsdataset.factory to make FSDataset in Datanode pluggable.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1292419 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-22 17:47:39 +00:00
parent b2172c394e
commit efbc58f30c
24 changed files with 157 additions and 153 deletions

View File

@ -238,6 +238,9 @@ Release 0.23.2 - UNRELEASED
HDFS-2725. hdfs script usage information is missing the information HDFS-2725. hdfs script usage information is missing the information
about "dfs" command (Prashant Sharma via stevel) about "dfs" command (Prashant Sharma via stevel)
HDFS-2907. Add a conf property dfs.datanode.fsdataset.factory to make
FSDataset in Datanode pluggable. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -171,7 +171,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname"; public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts"; public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude"; public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
@ -215,10 +214,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64; public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours"; public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
public static final String DFS_DATANODE_SIMULATEDDATASTORAGE_KEY = "dfs.datanode.simulateddatastorage";
public static final boolean DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT = false;
public static final String DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_KEY = "dfs.datanode.simulateddatastorage.capacity";
public static final long DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_DEFAULT = 2L<<40;
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy"; public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy";
@ -286,6 +281,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
//Keys with no defaults //Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup"; public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins"; public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";

View File

@ -88,7 +88,6 @@ private static void addDeprecatedKeys() {
deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY); deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY);
deprecate("dfs.upgrade.permission", DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY); deprecate("dfs.upgrade.permission", DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY);
deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY); deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
deprecate("StorageId", DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY);
deprecate("dfs.https.client.keystore.resource", DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY); deprecate("dfs.https.client.keystore.resource", DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY);
deprecate("dfs.https.need.client.auth", DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY); deprecate("dfs.https.need.client.auth", DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY);
deprecate("slave.host.name", DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY); deprecate("slave.host.name", DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);

View File

@ -43,10 +43,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
@ -162,13 +159,11 @@
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
@ -437,13 +432,14 @@ void refreshNamenodes(Configuration conf)
} }
} }
private synchronized void setClusterId(String cid) throws IOException { private synchronized void setClusterId(final String nsCid, final String bpid
if(clusterId != null && !clusterId.equals(cid)) { ) throws IOException {
throw new IOException ("cluster id doesn't match. old cid=" + clusterId if(clusterId != null && !clusterId.equals(nsCid)) {
+ " new cid="+ cid); throw new IOException ("Cluster IDs not matched: dn cid=" + clusterId
+ " but ns cid="+ nsCid + "; bpid=" + bpid);
} }
// else // else
clusterId = cid; clusterId = nsCid;
} }
private static String getHostName(Configuration config) private static String getHostName(Configuration config)
@ -845,51 +841,22 @@ void shutdownBlockPool(BPOfferService bpos) {
*/ */
void initBlockPool(BPOfferService bpos) throws IOException { void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo(); NamespaceInfo nsInfo = bpos.getNamespaceInfo();
Preconditions.checkState(nsInfo != null, if (nsInfo == null) {
"Block pool " + bpos + " should have retrieved " + throw new IOException("NamespaceInfo not found: Block pool " + bpos
"its namespace info before calling initBlockPool."); + " should have retrieved namespace info before initBlockPool.");
}
String blockPoolId = nsInfo.getBlockPoolID();
// Register the new block pool with the BP manager. // Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos); blockPoolManager.addBlockPool(bpos);
synchronized (this) { setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setClusterId(nsInfo.clusterID);
}
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (!simulatedFSDataset) {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
dataDirs, startOpt);
StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
LOG.info("setting up storage: nsid=" +
bpStorage.getNamespaceID() + ";bpid="
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo);
}
// In the case that this is the first block pool to connect, initialize // In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc. // the dataset, block scanners, etc.
initFsDataSet(); initStorage(nsInfo);
initPeriodicScanners(conf); initPeriodicScanners(conf);
data.addBlockPool(blockPoolId, conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf);
} }
/** /**
@ -916,31 +883,28 @@ int getBpOsCount() {
* Initializes the {@link #data}. The initialization is done only once, when * Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed. * handshake with the the first namenode is completed.
*/ */
private synchronized void initFsDataSet() throws IOException { private void initStorage(final NamespaceInfo nsInfo) throws IOException {
if (data != null) { // Already initialized final FSDatasetInterface.Factory factory
return; = FSDatasetInterface.Factory.getFactory(conf);
if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf);
if (startOpt == null) {
throw new IOException("Startup option not set.");
}
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+ ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
+ ";nsInfo=" + nsInfo);
} }
// get version and id info from the name-node synchronized(this) {
boolean simulatedFSDataset = conf.getBoolean( if (data == null) {
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, data = factory.createFSDatasetInterface(this, storage, conf);
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (simulatedFSDataset) {
storage.createStorageID(getPort());
// it would have been better to pass storage as a parameter to
// constructor below - need to augment ReflectionUtils used below.
conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
try {
data = (FSDatasetInterface) ReflectionUtils.newInstance(
Class.forName(
"org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
conf);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
} }
} else {
data = new FSDataset(this, storage, conf);
} }
} }

View File

@ -75,6 +75,16 @@
***************************************************/ ***************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
class FSDataset implements FSDatasetInterface { class FSDataset implements FSDatasetInterface {
/**
* A factory for creating FSDataset objects.
*/
static class Factory extends FSDatasetInterface.Factory {
@Override
public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new FSDataset(datanode, storage, conf);
}
}
/** /**
* A node type that can be built into a tree reflecting the * A node type that can be built into a tree reflecting the
@ -1056,8 +1066,8 @@ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
*/ */
FSDataset(DataNode datanode, DataStorage storage, Configuration conf) private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
throws IOException { ) throws IOException {
this.datanode = datanode; this.datanode = datanode;
this.maxBlocksPerDir = this.maxBlocksPerDir =
conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -38,6 +39,7 @@
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/** /**
@ -49,6 +51,30 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface FSDatasetInterface extends FSDatasetMBean { public interface FSDatasetInterface extends FSDatasetMBean {
/**
* A factory for creating FSDatasetInterface objects.
*/
public abstract class Factory {
/** @return the configured factory. */
public static Factory getFactory(Configuration conf) {
final Class<? extends Factory> clazz = conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
FSDataset.Factory.class,
Factory.class);
return ReflectionUtils.newInstance(clazz, conf);
}
/** Create a FSDatasetInterface object. */
public abstract FSDatasetInterface createFSDatasetInterface(
DataNode datanode, DataStorage storage, Configuration conf
) throws IOException;
/** Does the factory create simulated objects? */
public boolean isSimulated() {
return false;
}
}
/** /**
* This is an interface for the underlying volume. * This is an interface for the underlying volume.
* @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume * @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog; import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
@ -122,10 +123,9 @@ public static void main(String[] args) {
} }
dataNodeDirs = args[i]; dataNodeDirs = args[i];
} else if (args[i].equals("-simulated")) { } else if (args[i].equals("-simulated")) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} else if (args[i].equals("-inject")) { } else if (args[i].equals("-inject")) {
if (!conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, if (!FSDatasetInterface.Factory.getFactory(conf).isSimulated()) {
false) ) {
System.out.print("-inject is valid only for simulated"); System.out.print("-inject is valid only for simulated");
printUsageExit(); printUsageExit();
} }
@ -158,7 +158,7 @@ public static void main(String[] args) {
System.exit(-1); System.exit(-1);
} }
boolean simulated = boolean simulated =
conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, false); FSDatasetInterface.Factory.getFactory(conf).isSimulated();
System.out.println("Starting " + numDataNodes + System.out.println("Starting " + numDataNodes +
(simulated ? " Simulated " : " ") + (simulated ? " Simulated " : " ") +
" Data Nodes that will connect to Name Node at " + nameNodeAdr); " Data Nodes that will connect to Name Node at " + nameNodeAdr);

View File

@ -842,7 +842,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs); conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
} }
if (simulatedCapacities != null) { if (simulatedCapacities != null) {
dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(dnConf);
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]); simulatedCapacities[i-curDatanodesNum]);
} }

View File

@ -107,7 +107,7 @@ private void checkFile(FileSystem fileSys, Path name, int repl)
public void testCopyOnWrite() throws IOException { public void testCopyOnWrite() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -178,7 +178,7 @@ public void testCopyOnWrite() throws IOException {
public void testSimpleFlush() throws IOException { public void testSimpleFlush() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -234,7 +234,7 @@ public void testSimpleFlush() throws IOException {
public void testComplexFlush() throws IOException { public void testComplexFlush() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -283,7 +283,7 @@ public void testComplexFlush() throws IOException {
public void testFileNotFound() throws IOException { public void testFileNotFound() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -82,7 +82,7 @@ public class TestFileAppend2 extends TestCase {
public void testSimpleAppend() throws IOException { public void testSimpleAppend() throws IOException {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50); conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);

View File

@ -77,7 +77,7 @@ public class TestFileAppend4 {
public void setUp() throws Exception { public void setUp() throws Exception {
this.conf = new Configuration(); this.conf = new Configuration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);

View File

@ -145,7 +145,7 @@ public void testServerDefaults() throws IOException {
public void testFileCreation() throws IOException { public void testFileCreation() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -224,7 +224,7 @@ public void testFileCreation() throws IOException {
public void testDeleteOnExit() throws IOException { public void testDeleteOnExit() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -288,7 +288,7 @@ public void testFileCreationError1() throws IOException {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
// create cluster // create cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -362,7 +362,7 @@ public void testFileCreationError2() throws IOException {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
// create cluster // create cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -461,7 +461,7 @@ public void xxxtestFileCreationNamenodeRestart() throws IOException {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
// create cluster // create cluster
@ -600,7 +600,7 @@ public void testDFSClientDeath() throws IOException, InterruptedException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
System.out.println("Testing adbornal client death."); System.out.println("Testing adbornal client death.");
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -635,7 +635,7 @@ public void testDFSClientDeath() throws IOException, InterruptedException {
public void testFileCreationNonRecursive() throws IOException { public void testFileCreationNonRecursive() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -137,7 +137,7 @@ public void testInjection() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes)); conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes));
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize);
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
//first time format //first time format
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive(); cluster.waitActive();
@ -160,7 +160,7 @@ public void testInjection() throws IOException {
LOG.info("Restarting minicluster"); LOG.info("Restarting minicluster");
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)

View File

@ -175,7 +175,7 @@ public void runTest(final long blockSize) throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -206,7 +206,7 @@ private void dfsPreadTest(boolean disableTransferTo) throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, true); SimulatedFSDataset.setFactory(conf);
} }
if (disableTransferTo) { if (disableTransferTo) {
conf.setBoolean("dfs.datanode.transferTo.allowed", false); conf.setBoolean("dfs.datanode.transferTo.allowed", false);

View File

@ -200,7 +200,7 @@ public void runReplication(boolean simulated) throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
if (simulated) { if (simulated) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)

View File

@ -28,7 +28,7 @@ public class TestSetrepIncreasing extends TestCase {
static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException { static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "" + fromREP); conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "" + fromREP);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);

View File

@ -124,7 +124,7 @@ public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName()); UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build(); .format(true).build();
@ -248,7 +248,7 @@ public void testSkipWithVerifyChecksum() throws IOException {
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName()); UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build(); .format(true).build();

View File

@ -93,7 +93,7 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
public void testSmallBlock() throws IOException { public void testSmallBlock() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, true); SimulatedFSDataset.setFactory(conf);
} }
conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1"); conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

View File

@ -77,7 +77,7 @@ static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
} }

View File

@ -22,7 +22,6 @@
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -32,7 +31,6 @@
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.StandardMBean; import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -63,21 +61,33 @@
* *
* Note the synchronization is coarse grained - it is at each method. * Note the synchronization is coarse grained - it is at each method.
*/ */
public class SimulatedFSDataset implements FSDatasetInterface {
static class Factory extends FSDatasetInterface.Factory {
@Override
public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new SimulatedFSDataset(datanode, storage, conf);
}
public class SimulatedFSDataset implements FSDatasetInterface, Configurable{ @Override
public boolean isSimulated() {
return true;
}
}
public static void setFactory(Configuration conf) {
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
Factory.class.getName());
}
public static final String CONFIG_PROPERTY_SIMULATED =
DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
public static final String CONFIG_PROPERTY_CAPACITY = public static final String CONFIG_PROPERTY_CAPACITY =
DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_KEY; "dfs.datanode.simulateddatastorage.capacity";
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte public static final byte DEFAULT_DATABYTE = 9;
byte simulatedDataByte = DEFAULT_DATABYTE;
Configuration conf = null;
static byte[] nullCrcFileData; static final byte[] nullCrcFileData;
{ static {
DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum. DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
CHECKSUM_NULL, 16*1024 ); CHECKSUM_NULL, 16*1024 );
byte[] nullCrcHeader = checksum.getHeader(); byte[] nullCrcHeader = checksum.getHeader();
@ -360,31 +370,22 @@ private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
} }
} }
private Map<String, Map<Block, BInfo>> blockMap = null; private final Map<String, Map<Block, BInfo>> blockMap
private SimulatedStorage storage = null; = new HashMap<String, Map<Block,BInfo>>();
private String storageId; private final SimulatedStorage storage;
private final String storageId;
public SimulatedFSDataset(Configuration conf) { public SimulatedFSDataset(DataNode datanode, DataStorage storage,
setConf(conf); Configuration conf) {
} if (storage != null) {
storage.createStorageID(datanode.getPort());
// Constructor used for constructing the object using reflection this.storageId = storage.getStorageID();
@SuppressWarnings("unused") } else {
private SimulatedFSDataset() { // real construction when setConf called.. this.storageId = "unknownStorageId" + new Random().nextInt();
} }
public Configuration getConf() {
return conf;
}
public void setConf(Configuration iconf) {
conf = iconf;
storageId = conf.get(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, "unknownStorageId" +
new Random().nextInt());
registerMBean(storageId); registerMBean(storageId);
storage = new SimulatedStorage( this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
blockMap = new HashMap<String, Map<Block,BInfo>>();
} }
public synchronized void injectBlocks(String bpid, public synchronized void injectBlocks(String bpid,
@ -441,23 +442,16 @@ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
@Override @Override
public synchronized BlockListAsLongs getBlockReport(String bpid) { public synchronized BlockListAsLongs getBlockReport(String bpid) {
final List<Block> blocks = new ArrayList<Block>();
final Map<Block, BInfo> map = blockMap.get(bpid); final Map<Block, BInfo> map = blockMap.get(bpid);
Block[] blockTable = new Block[map.size()];
if (map != null) { if (map != null) {
int count = 0;
for (BInfo b : map.values()) { for (BInfo b : map.values()) {
if (b.isFinalized()) { if (b.isFinalized()) {
blockTable[count++] = b.theBlock; blocks.add(b.theBlock);
} }
} }
if (count != blockTable.length) {
blockTable = Arrays.copyOf(blockTable, count);
}
} else {
blockTable = new Block[0];
} }
return new BlockListAsLongs( return new BlockListAsLongs(blocks, null);
new ArrayList<Block>(Arrays.asList(blockTable)), null);
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean

View File

@ -34,7 +34,7 @@ public class TestDataNodeMetrics extends TestCase {
public void testDataNodeMetrics() throws Exception { public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -44,8 +44,8 @@ public class TestSimulatedFSDataset extends TestCase {
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
@ -87,6 +87,18 @@ int addSomeBlocks(FSDatasetInterface fsdataset ) throws IOException {
return addSomeBlocks(fsdataset, 1); return addSomeBlocks(fsdataset, 1);
} }
public void testFSDatasetFactory() {
final Configuration conf = new Configuration();
FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf);
assertEquals(FSDataset.Factory.class, f.getClass());
assertFalse(f.isSimulated());
SimulatedFSDataset.setFactory(conf);
FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf);
assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
assertTrue(s.isSimulated());
}
public void testGetMetaData() throws IOException { public void testGetMetaData() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); FSDatasetInterface fsdataset = getSimulatedFSDataset();
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
@ -287,8 +299,8 @@ public void testInvalidate() throws IOException {
} }
} }
private SimulatedFSDataset getSimulatedFSDataset() throws IOException { private SimulatedFSDataset getSimulatedFSDataset() {
SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, null, conf);
fsdataset.addBlockPool(bpid, conf); fsdataset.addBlockPool(bpid, conf);
return fsdataset; return fsdataset;
} }

View File

@ -84,7 +84,7 @@ public void testFileLimit() throws IOException {
int currentNodes = 0; int currentNodes = 0;
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();