HDFS-7034. Archival Storage: Fix TestBlockPlacement and TestStorageMover. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2014-09-11 13:00:43 -07:00
parent 70dfe9cfab
commit 0d85f7e591
8 changed files with 215 additions and 160 deletions

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
@ -88,7 +89,11 @@ public class Dispatcher {
private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB; private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds /**
* the period of time to delay the usage of a DataNode after hitting
* errors when using it for migrating data
*/
private static long delayAfterErrors = 10 * 1000;
private final NameNodeConnector nnc; private final NameNodeConnector nnc;
private final SaslDataTransferClient saslClient; private final SaslDataTransferClient saslClient;
@ -112,6 +117,7 @@ public class Dispatcher {
private final ExecutorService moveExecutor; private final ExecutorService moveExecutor;
private final ExecutorService dispatchExecutor; private final ExecutorService dispatchExecutor;
/** The maximum number of concurrent blocks moves at a datanode */ /** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode; private final int maxConcurrentMovesPerNode;
@ -187,10 +193,12 @@ private PendingMove(Source source, StorageGroup target) {
@Override @Override
public String toString() { public String toString() {
final Block b = block.getBlock(); final Block b = block != null ? block.getBlock() : null;
return b + " with size=" + b.getNumBytes() + " from " String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
+ source.getDisplayName() + " to " + target.getDisplayName() : " ";
+ " through " + proxySource.datanode; return bStr + "from " + source.getDisplayName() + " to " + target
.getDisplayName() + " through " + (proxySource != null ? proxySource
.datanode : "");
} }
/** /**
@ -316,8 +324,8 @@ private void dispatch() {
// further in order to avoid a potential storm of "threads quota // further in order to avoid a potential storm of "threads quota
// exceeded" warnings when the dispatcher gets out of sync with work // exceeded" warnings when the dispatcher gets out of sync with work
// going on in datanodes. // going on in datanodes.
proxySource.activateDelay(DELAY_AFTER_ERROR); proxySource.activateDelay(delayAfterErrors);
target.getDDatanode().activateDelay(DELAY_AFTER_ERROR); target.getDDatanode().activateDelay(delayAfterErrors);
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
IOUtils.closeStream(in); IOUtils.closeStream(in);
@ -1043,6 +1051,11 @@ public static void setBlockMoveWaitTime(long time) {
blockMoveWaitTime = time; blockMoveWaitTime = time;
} }
@VisibleForTesting
public static void setDelayAfterErrors(long time) {
delayAfterErrors = time;
}
/** shutdown thread pools */ /** shutdown thread pools */
public void shutdownNow() { public void shutdownNow() {
if (dispatchExecutor != null) { if (dispatchExecutor != null) {

View File

@ -25,13 +25,16 @@
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@ -53,6 +56,7 @@ public class NameNodeConnector implements Closeable {
private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
private static final int MAX_NOT_CHANGED_ITERATIONS = 5; private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
private static boolean createIdFile = true;
/** Create {@link NameNodeConnector} for the given namenodes. */ /** Create {@link NameNodeConnector} for the given namenodes. */
public static List<NameNodeConnector> newNameNodeConnectors( public static List<NameNodeConnector> newNameNodeConnectors(
@ -83,6 +87,11 @@ public static List<NameNodeConnector> newNameNodeConnectors(
return connectors; return connectors;
} }
@VisibleForTesting
public static void setCreateIdFile(boolean create) {
createIdFile = create;
}
private final URI nameNodeUri; private final URI nameNodeUri;
private final String blockpoolID; private final String blockpoolID;
@ -117,9 +126,10 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
this.keyManager = new KeyManager(blockpoolID, namenode, this.keyManager = new KeyManager(blockpoolID, namenode,
defaults.getEncryptDataTransfer(), conf); defaults.getEncryptDataTransfer(), conf);
// Exit if there is another one running. // if it is for test, we do not create the id file
out = checkAndMarkRunning(); out = createIdFile ? checkAndMarkRunning() : null;
if (out == null) { if (createIdFile && out == null) {
// Exit if there is another one running.
throw new IOException("Another " + name + " is running."); throw new IOException("Another " + name + " is running.");
} }
} }
@ -188,9 +198,9 @@ public boolean shouldContinue(long dispatchBlockMoveBytes) {
*/ */
private OutputStream checkAndMarkRunning() throws IOException { private OutputStream checkAndMarkRunning() throws IOException {
try { try {
final DataOutputStream out = fs.create(idPath); final FSDataOutputStream out = fs.create(idPath);
out.writeBytes(InetAddress.getLocalHost().getHostName()); out.writeBytes(InetAddress.getLocalHost().getHostName());
out.flush(); out.hflush();
return out; return out;
} catch(RemoteException e) { } catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){

View File

@ -1502,7 +1502,7 @@ public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src,
* @throws IOException * @throws IOException
* if the number of targets < minimum replication. * if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node, * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* List, boolean, Set, long, StorageType) * Set, long, List, BlockStoragePolicy)
*/ */
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final DatanodeDescriptor client, final int numOfReplicas, final DatanodeDescriptor client,
@ -2811,7 +2811,7 @@ static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
return false; // only consider delHint for the first case return false; // only consider delHint for the first case
} else if (delHint == null) { } else if (delHint == null) {
return false; // no delHint return false; // no delHint
} else if (!excessTypes.remove(delHint.getStorageType())) { } else if (!excessTypes.contains(delHint.getStorageType())) {
return false; // delHint storage type is not an excess type return false; // delHint storage type is not an excess type
} else { } else {
// check if removing delHint reduces the number of racks // check if removing delHint reduces the number of racks

View File

@ -1744,7 +1744,9 @@ private class DataTransfer implements Runnable {
+ b + " (numBytes=" + b.getNumBytes() + ")" + b + " (numBytes=" + b.getNumBytes() + ")"
+ ", stage=" + stage + ", stage=" + stage
+ ", clientname=" + clientname + ", clientname=" + clientname
+ ", targets=" + Arrays.asList(targets)); + ", targets=" + Arrays.asList(targets)
+ ", target storage types=" + (targetStorageTypes == null ? "[]" :
Arrays.asList(targetStorageTypes)));
} }
this.targets = targets; this.targets = targets;
this.targetStorageTypes = targetStorageTypes; this.targetStorageTypes = targetStorageTypes;

View File

@ -328,8 +328,6 @@ private boolean processFile(HdfsLocatedFileStatus status) {
if (scheduleMoves4Block(diff, lb)) { if (scheduleMoves4Block(diff, lb)) {
hasRemaining |= (diff.existing.size() > 1 && hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1); diff.expected.size() > 1);
} else {
hasRemaining = false; // not able to schedule any move
} }
} }
} }
@ -453,9 +451,11 @@ public String toString() {
static int run(Map<URI, List<Path>> namenodes, Configuration conf) static int run(Map<URI, List<Path>> namenodes, Configuration conf)
throws IOException, InterruptedException { throws IOException, InterruptedException {
final long sleeptime = 2000 * conf.getLong( final long sleeptime =
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();

View File

@ -256,7 +256,7 @@ private static void run(CheckChooseStorageTypes method) {
final short replication = 3; final short replication = 3;
{ {
final List<StorageType> chosen = Arrays.asList(); final List<StorageType> chosen = Lists.newArrayList();
method.checkChooseStorageTypes(hot, replication, chosen, method.checkChooseStorageTypes(hot, replication, chosen,
StorageType.DISK, StorageType.DISK, StorageType.DISK); StorageType.DISK, StorageType.DISK, StorageType.DISK);
method.checkChooseStorageTypes(warm, replication, chosen, method.checkChooseStorageTypes(warm, replication, chosen,
@ -393,7 +393,7 @@ public void testChooseStorageTypesWithDiskUnavailableAndNewBlock() {
final EnumSet<StorageType> unavailables = disk; final EnumSet<StorageType> unavailables = disk;
final boolean isNewBlock = true; final boolean isNewBlock = true;
{ {
final List<StorageType> chosen = Arrays.asList(); final List<StorageType> chosen = Lists.newArrayList();
checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
@ -500,7 +500,7 @@ private static void runWithArchiveUnavailable(CheckChooseStorageTypes method) {
final short replication = 3; final short replication = 3;
{ {
final List<StorageType> chosen = Arrays.asList(); final List<StorageType> chosen = Lists.newArrayList();
method.checkChooseStorageTypes(hot, replication, chosen, method.checkChooseStorageTypes(hot, replication, chosen,
StorageType.DISK, StorageType.DISK, StorageType.DISK); StorageType.DISK, StorageType.DISK, StorageType.DISK);
method.checkChooseStorageTypes(warm, replication, chosen, method.checkChooseStorageTypes(warm, replication, chosen,
@ -603,7 +603,7 @@ public void testChooseStorageTypesWithDiskUnavailableAndNonNewBlock() {
final EnumSet<StorageType> unavailables = disk; final EnumSet<StorageType> unavailables = disk;
final boolean isNewBlock = false; final boolean isNewBlock = false;
{ {
final List<StorageType> chosen = Arrays.asList(); final List<StorageType> chosen = Lists.newArrayList();
checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,

View File

@ -44,9 +44,13 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher; import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -66,6 +70,8 @@ public class TestStorageMover {
).getLogger().setLevel(Level.ALL); ).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(Dispatcher.class) ((Log4JLogger)LogFactory.getLog(Dispatcher.class)
).getLogger().setLevel(Level.ALL); ).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(DataTransferProtocol.class)).getLogger()
.setLevel(Level.ALL);
} }
private static final int BLOCK_SIZE = 1024; private static final int BLOCK_SIZE = 1024;
@ -80,6 +86,8 @@ public class TestStorageMover {
static { static {
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
2L);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF); DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF);
@ -87,6 +95,9 @@ public class TestStorageMover {
WARM = DEFAULT_POLICIES.getPolicy("WARM"); WARM = DEFAULT_POLICIES.getPolicy("WARM");
COLD = DEFAULT_POLICIES.getPolicy("COLD"); COLD = DEFAULT_POLICIES.getPolicy("COLD");
Dispatcher.setBlockMoveWaitTime(1000L); Dispatcher.setBlockMoveWaitTime(1000L);
Dispatcher.setDelayAfterErrors(1000L);
// do not create id file since we will eat up all the disk space
NameNodeConnector.setCreateIdFile(false);
} }
/** /**
@ -151,7 +162,7 @@ static class ClusterScheme {
ClusterScheme() { ClusterScheme() {
this(DEFAULT_CONF, NUM_DATANODES, REPL, this(DEFAULT_CONF, NUM_DATANODES, REPL,
genStorageTypes(NUM_DATANODES, 1, 1), null); genStorageTypes(NUM_DATANODES), null);
} }
ClusterScheme(Configuration conf, int numDataNodes, short repl, ClusterScheme(Configuration conf, int numDataNodes, short repl,
@ -195,7 +206,7 @@ void setupCluster() throws Exception {
dfs = cluster.getFileSystem(); dfs = cluster.getFileSystem();
} }
private void runBasicTest(boolean shotdown) throws Exception { private void runBasicTest(boolean shutdown) throws Exception {
setupCluster(); setupCluster();
try { try {
prepareNamespace(); prepareNamespace();
@ -205,7 +216,7 @@ private void runBasicTest(boolean shotdown) throws Exception {
migrate(); migrate();
verify(true); verify(true);
} finally { } finally {
if (shotdown) { if (shutdown) {
shutdownCluster(); shutdownCluster();
} }
} }
@ -233,7 +244,7 @@ void setStoragePolicy() throws Exception {
/** /**
* Run the migration tool. * Run the migration tool.
*/ */
void migrate(String... args) throws Exception { void migrate() throws Exception {
runMover(); runMover();
Thread.sleep(5000); // let the NN finish deletion Thread.sleep(5000); // let the NN finish deletion
} }
@ -242,6 +253,9 @@ void migrate(String... args) throws Exception {
* Verify block locations after running the migration tool. * Verify block locations after running the migration tool.
*/ */
void verify(boolean verifyAll) throws Exception { void verify(boolean verifyAll) throws Exception {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
if (verifyAll) { if (verifyAll) {
verifyNamespace(); verifyNamespace();
} else { } else {
@ -308,7 +322,8 @@ private void verifyFile(final Path parent, final HdfsFileStatus status,
final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
lb.getStorageTypes()); lb.getStorageTypes());
Assert.assertTrue(fileStatus.getFullName(parent.toString()) Assert.assertTrue(fileStatus.getFullName(parent.toString())
+ " with policy " + policy + " has non-empty overlap: " + diff, + " with policy " + policy + " has non-empty overlap: " + diff
+ ", the corresponding block is " + lb.getBlock().getLocalBlock(),
diff.removeOverlap()); diff.removeOverlap());
} }
} }
@ -378,6 +393,7 @@ public String toString() {
return "[disk=" + disk + ", archive=" + archive + "]"; return "[disk=" + disk + ", archive=" + archive + "]";
} }
} }
private static StorageType[][] genStorageTypes(int numDataNodes) { private static StorageType[][] genStorageTypes(int numDataNodes) {
return genStorageTypes(numDataNodes, 0, 0); return genStorageTypes(numDataNodes, 0, 0);
} }
@ -414,21 +430,6 @@ private static long[][] genCapacities(int nDatanodes, int numAllDisk,
return capacities; return capacities;
} }
/**
* A normal case for Mover: move a file into archival storage
*/
@Test
public void testMigrateFileToArchival() throws Exception {
final Path foo = new Path("/foo");
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
policyMap.put(foo, COLD);
NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
2*BLOCK_SIZE, null, policyMap);
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
}
private static class PathPolicyMap { private static class PathPolicyMap {
final Map<Path, BlockStoragePolicy> map = Maps.newHashMap(); final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
final Path hot = new Path("/hot"); final Path hot = new Path("/hot");
@ -472,22 +473,42 @@ void moveAround(DistributedFileSystem dfs) throws Exception {
} }
} }
/**
* A normal case for Mover: move a file into archival storage
*/
@Test
public void testMigrateFileToArchival() throws Exception {
LOG.info("testMigrateFileToArchival");
final Path foo = new Path("/foo");
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
policyMap.put(foo, COLD);
NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
2*BLOCK_SIZE, null, policyMap);
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
}
/** /**
* Test directories with Hot, Warm and Cold polices. * Test directories with Hot, Warm and Cold polices.
*/ */
@Test @Test
public void testHotWarmColdDirs() throws Exception { public void testHotWarmColdDirs() throws Exception {
LOG.info("testHotWarmColdDirs");
PathPolicyMap pathPolicyMap = new PathPolicyMap(3); PathPolicyMap pathPolicyMap = new PathPolicyMap(3);
NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
ClusterScheme clusterScheme = new ClusterScheme(); ClusterScheme clusterScheme = new ClusterScheme();
MigrationTest test = new MigrationTest(clusterScheme, nsScheme); MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
test.runBasicTest(false); try {
test.runBasicTest(false);
pathPolicyMap.moveAround(test.dfs);
test.migrate();
pathPolicyMap.moveAround(test.dfs); test.verify(true);
test.migrate(); } finally {
test.verify(true); test.shutdownCluster();
test.shutdownCluster(); }
} }
/** /**
@ -495,76 +516,81 @@ public void testHotWarmColdDirs() throws Exception {
*/ */
@Test @Test
public void testNoSpaceDisk() throws Exception { public void testNoSpaceDisk() throws Exception {
LOG.info("testNoSpaceDisk");
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = (10 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; final long diskCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
final long archiveCapacity = 100*BLOCK_SIZE; * BLOCK_SIZE;
final long archiveCapacity = 100 * BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity); diskCapacity, archiveCapacity);
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, Configuration conf = new Configuration(DEFAULT_CONF);
final ClusterScheme clusterScheme = new ClusterScheme(conf,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
test.runBasicTest(false); try {
test.runBasicTest(false);
// create hot files with replication 3 until not more spaces. // create hot files with replication 3 until not more spaces.
final short replication = 3; final short replication = 3;
{ {
int hotFileCount = 0; int hotFileCount = 0;
try { try {
for(; ; hotFileCount++) { for (; ; hotFileCount++) {
final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount); final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
}
} catch (IOException e) {
LOG.info("Expected: hotFileCount=" + hotFileCount, e);
} }
} catch(IOException e) { Assert.assertTrue(hotFileCount >= 1);
LOG.info("Expected: hotFileCount=" + hotFileCount, e);
} }
Assert.assertTrue(hotFileCount >= 2);
}
// create hot files with replication 1 to use up all remaining spaces. // create hot files with replication 1 to use up all remaining spaces.
{ {
int hotFileCount_r1 = 0; int hotFileCount_r1 = 0;
try { try {
for(; ; hotFileCount_r1++) { for (; ; hotFileCount_r1++) {
final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1); final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L); DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
}
} catch (IOException e) {
LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
} }
} catch(IOException e) {
LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
} }
{ // test increasing replication. Since DISK is full,
// new replicas should be stored in ARCHIVE as a fallback storage.
final Path file0 = new Path(pathPolicyMap.hot, "file0");
final Replication r = test.getReplication(file0);
final short newReplication = (short) 5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, r.disk, newReplication - r.disk);
}
{ // test creating a cold file and then increase replication
final Path p = new Path(pathPolicyMap.cold, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
test.verifyReplication(p, 0, replication);
final short newReplication = 5;
test.dfs.setReplication(p, newReplication);
Thread.sleep(10000);
test.verifyReplication(p, 0, newReplication);
}
{ //test move a hot file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
}
} finally {
test.shutdownCluster();
} }
{ // test increasing replication. Since DISK is full,
// new replicas should be stored in ARCHIVE as a fallback storage.
final Path file0 = new Path(pathPolicyMap.hot, "file0");
final Replication r = test.getReplication(file0);
final short newReplication = (short)5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, r.disk, newReplication - r.disk);
}
{ // test creating a cold file and then increase replication
final Path p = new Path(pathPolicyMap.cold, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
test.verifyReplication(p, 0, replication);
final short newReplication = 5;
test.dfs.setReplication(p, newReplication);
Thread.sleep(10000);
test.verifyReplication(p, 0, newReplication);
}
{ //test move a hot file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());;
}
test.shutdownCluster();
} }
/** /**
@ -572,73 +598,77 @@ public void testNoSpaceDisk() throws Exception {
*/ */
@Test @Test
public void testNoSpaceArchive() throws Exception { public void testNoSpaceArchive() throws Exception {
LOG.info("testNoSpaceArchive");
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = 100*BLOCK_SIZE; final long diskCapacity = 100 * BLOCK_SIZE;
final long archiveCapacity = (10 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
* BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity); diskCapacity, archiveCapacity);
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
test.runBasicTest(false); try {
test.runBasicTest(false);
// create cold files with replication 3 until not more spaces. // create cold files with replication 3 until not more spaces.
final short replication = 3; final short replication = 3;
{ {
int coldFileCount = 0; int coldFileCount = 0;
try { try {
for(; ; coldFileCount++) { for (; ; coldFileCount++) {
final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount); final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
}
} catch (IOException e) {
LOG.info("Expected: coldFileCount=" + coldFileCount, e);
} }
} catch(IOException e) { Assert.assertTrue(coldFileCount >= 1);
LOG.info("Expected: coldFileCount=" + coldFileCount, e);
} }
Assert.assertTrue(coldFileCount >= 2);
}
// create cold files with replication 1 to use up all remaining spaces. // create cold files with replication 1 to use up all remaining spaces.
{ {
int coldFileCount_r1 = 0; int coldFileCount_r1 = 0;
try { try {
for(; ; coldFileCount_r1++) { for (; ; coldFileCount_r1++) {
final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1); final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L); DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
}
} catch (IOException e) {
LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
} }
} catch(IOException e) {
LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
} }
{ // test increasing replication but new replicas cannot be created
// since no more ARCHIVE space.
final Path file0 = new Path(pathPolicyMap.cold, "file0");
final Replication r = test.getReplication(file0);
LOG.info("XXX " + file0 + ": replication=" + r);
Assert.assertEquals(0, r.disk);
final short newReplication = (short) 5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, 0, r.archive);
}
{ // test creating a hot file
final Path p = new Path(pathPolicyMap.hot, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 3, 0L);
}
{ //test move a cold file to warm
final Path file1 = new Path(pathPolicyMap.cold, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.verify(true);
}
} finally {
test.shutdownCluster();
} }
{ // test increasing replication but new replicas cannot be created
// since no more ARCHIVE space.
final Path file0 = new Path(pathPolicyMap.cold, "file0");
final Replication r = test.getReplication(file0);
LOG.info("XXX " + file0 + ": replication=" + r);
Assert.assertEquals(0, r.disk);
final short newReplication = (short)5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, 0, r.archive);
}
{ // test creating a hot file
final Path p = new Path(pathPolicyMap.hot, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)3, 0L);
}
{ //test move a cold file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.verify(true);
}
test.shutdownCluster();
} }
} }