diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 41ea1f3866..98bd58e377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -43,6 +43,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -166,6 +167,10 @@ int size() { void clear() { map.clear(); } + + public Collection values() { + return map.values(); + } } /** This class keeps track of a scheduled block move */ @@ -306,6 +311,7 @@ private void dispatch() { LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); + target.getDDatanode().setHasFailure(); // Proxy or target may have some issues, delay before using these nodes // further in order to avoid a potential storm of "threads quota // exceeded" warnings when the dispatcher gets out of sync with work @@ -366,6 +372,19 @@ public static class DBlock extends MovedBlocks.Locations { public DBlock(Block block) { super(block); } + + @Override + public synchronized boolean isLocatedOn(StorageGroup loc) { + // currently we only check if replicas are located on the same DataNodes + // since we do not have the capability to store two replicas in the same + // DataNode even though they are on two different storage types + for (StorageGroup existing : locations) { + if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) { + return true; + } + } + return false; + } } /** The class represents a desired move. */ @@ -469,6 +488,7 @@ public String toString() { protected long delayUntil = 0L; /** blocks being moved but not confirmed yet */ private final List pendings; + private volatile boolean hasFailure = false; private final int maxConcurrentMoves; @Override @@ -538,6 +558,10 @@ synchronized boolean addPendingBlock(PendingMove pendingBlock) { synchronized boolean removePendingBlock(PendingMove pendingBlock) { return pendings.remove(pendingBlock); } + + void setHasFailure() { + this.hasFailure = true; + } } /** A node that can be the sources of a block move */ @@ -884,7 +908,7 @@ public void run() { } // wait for all block moving to be done - waitForMoveCompletion(); + waitForMoveCompletion(targets); return bytesMoved.get() - bytesLastMoved; } @@ -892,23 +916,25 @@ public void run() { /** The sleeping period before checking if block move is completed again */ static private long blockMoveWaitTime = 30000L; - /** set the sleeping period for block move completion check */ - static void setBlockMoveWaitTime(long time) { - blockMoveWaitTime = time; - } - - /** Wait for all block move confirmations. */ - private void waitForMoveCompletion() { + /** + * Wait for all block move confirmations. + * @return true if there is failed move execution + */ + public static boolean waitForMoveCompletion( + Iterable targets) { + boolean hasFailure = false; for(;;) { boolean empty = true; for (StorageGroup t : targets) { if (!t.getDDatanode().isPendingQEmpty()) { empty = false; break; + } else { + hasFailure |= t.getDDatanode().hasFailure; } } if (empty) { - return; //all pending queues are empty + return hasFailure; // all pending queues are empty } try { Thread.sleep(blockMoveWaitTime); @@ -919,7 +945,7 @@ private void waitForMoveCompletion() { /** * Decide if the block is a good candidate to be moved from source to target. - * A block is a good candidate if + * A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; * 2. the block does not have a replica on the target; * 3. doing the move does not reduce the number of racks that the block has @@ -986,7 +1012,7 @@ private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target, * Check if there are any replica (other than source) on the same node group * with target. If true, then target is not a good candidate for placing * specific replica as we don't want 2 replicas under the same nodegroup. - * + * * @return true if there are any replica (other than source) on the same node * group with target */ @@ -1011,9 +1037,17 @@ void reset(Configuration conf) { movedBlocks.cleanup(); } + /** set the sleeping period for block move completion check */ + @VisibleForTesting + public static void setBlockMoveWaitTime(long time) { + blockMoveWaitTime = time; + } + /** shutdown thread pools */ public void shutdownNow() { - dispatchExecutor.shutdownNow(); + if (dispatchExecutor != null) { + dispatchExecutor.shutdownNow(); + } moveExecutor.shutdownNow(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java index 557bfd36ab..18b9cd8ecf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java @@ -40,7 +40,7 @@ public class MovedBlocks { public static class Locations { private final Block block; // the block /** The locations of the replicas of the block. */ - private final List locations = new ArrayList(3); + protected final List locations = new ArrayList(3); public Locations(Block block) { this.block = block; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 4dbe1d37c8..2bb1317ad5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.mover; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -130,9 +131,8 @@ void init() throws IOException { private ExitStatus run() { try { init(); - new Processor().processNamespace(); - - return ExitStatus.IN_PROGRESS; + boolean hasRemaining = new Processor().processNamespace(); + return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS; } catch (IllegalArgumentException e) { System.out.println(e + ". Exiting ..."); return ExitStatus.ILLEGAL_ARGUMENTS; @@ -223,16 +223,29 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException { } } - private void processNamespace() { + /** + * @return whether there is still remaining migration work for the next + * round + */ + private boolean processNamespace() { getSnapshottableDirs(); + boolean hasRemaining = true; try { - processDirRecursively("", dfs.getFileInfo("/")); + hasRemaining = processDirRecursively("", dfs.getFileInfo("/")); } catch (IOException e) { LOG.warn("Failed to get root directory status. Ignore and continue.", e); } + // wait for pending move to finish and retry the failed migration + hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values()); + return hasRemaining; } - private void processChildrenList(String fullPath) { + /** + * @return whether there is still remaing migration work for the next + * round + */ + private boolean processChildrenList(String fullPath) { + boolean hasRemaining = false; for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { final DirectoryListing children; try { @@ -240,124 +253,128 @@ private void processChildrenList(String fullPath) { } catch(IOException e) { LOG.warn("Failed to list directory " + fullPath + ". Ignore the directory and continue.", e); - return; + return hasRemaining; } if (children == null) { - return; + return hasRemaining; } for (HdfsFileStatus child : children.getPartialListing()) { - processDirRecursively(fullPath, child); + hasRemaining |= processDirRecursively(fullPath, child); } - if (!children.hasMore()) { + if (children.hasMore()) { lastReturnedName = children.getLastName(); } else { - return; + return hasRemaining; } } } - private void processDirRecursively(String parent, HdfsFileStatus status) { + /** @return whether the migration requires next round */ + private boolean processDirRecursively(String parent, + HdfsFileStatus status) { String fullPath = status.getFullName(parent); - if (status.isSymlink()) { - return; //ignore symlinks - } else if (status.isDir()) { + boolean hasRemaining = false; + if (status.isDir()) { if (!fullPath.endsWith(Path.SEPARATOR)) { - fullPath = fullPath + Path.SEPARATOR; + fullPath = fullPath + Path.SEPARATOR; } - processChildrenList(fullPath); + hasRemaining = processChildrenList(fullPath); // process snapshots if this is a snapshottable directory if (snapshottableDirs.contains(fullPath)) { final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR; - processChildrenList(dirSnapshot); + hasRemaining |= processChildrenList(dirSnapshot); } - } else { // file + } else if (!status.isSymlink()) { // file try { - if (isSnapshotPathInCurrent(fullPath)) { + if (!isSnapshotPathInCurrent(fullPath)) { // the full path is a snapshot path but it is also included in the // current directory tree, thus ignore it. - return; + hasRemaining = processFile((HdfsLocatedFileStatus)status); } } catch (IOException e) { LOG.warn("Failed to check the status of " + parent + ". Ignore it and continue.", e); - return; + return false; } - processFile(parent, (HdfsLocatedFileStatus)status); } + return hasRemaining; } - private void processFile(String parent, HdfsLocatedFileStatus status) { + /** @return true if it is necessary to run another round of migration */ + private boolean processFile(HdfsLocatedFileStatus status) { final BlockStoragePolicy policy = blockStoragePolicies.getPolicy( status.getStoragePolicy()); final List types = policy.chooseStorageTypes( status.getReplication()); - final LocatedBlocks locations = status.getBlockLocations(); - for(LocatedBlock lb : locations.getLocatedBlocks()) { - final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); + final LocatedBlocks locatedBlocks = status.getBlockLocations(); + boolean hasRemaining = false; + for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + final StorageTypeDiff diff = new StorageTypeDiff(types, + lb.getStorageTypes()); if (!diff.removeOverlap()) { - scheduleMoves4Block(diff, lb); + if (scheduleMoves4Block(diff, lb)) { + hasRemaining |= (diff.existing.size() > 1 && + diff.expected.size() > 1); + } else { + hasRemaining = true; + } } } + return hasRemaining; } - void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { + boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { final List locations = MLocation.toLocations(lb); Collections.shuffle(locations); final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations); - for(final Iterator i = diff.existing.iterator(); i.hasNext(); ) { - final StorageType t = i.next(); - for(final Iterator j = locations.iterator(); j.hasNext(); ) { - final MLocation ml = j.next(); - final Source source = storages.getSource(ml); + for (final StorageType t : diff.existing) { + for (final MLocation ml : locations) { + final Source source = storages.getSource(ml); if (ml.storageType == t) { - // try to schedule replica move. - if (scheduleMoveReplica(db, ml, source, diff.expected)) { - i.remove(); - j.remove(); - return; + // try to schedule one replica move. + if (scheduleMoveReplica(db, source, diff.expected)) { + return true; } } } } + return false; } + @VisibleForTesting boolean scheduleMoveReplica(DBlock db, MLocation ml, - List targetTypes) { - return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes); + List targetTypes) { + return scheduleMoveReplica(db, storages.getSource(ml), targetTypes); } - boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source, + boolean scheduleMoveReplica(DBlock db, Source source, List targetTypes) { if (dispatcher.getCluster().isNodeGroupAware()) { - if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_NODE_GROUP)) { + if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) { return true; } } // Then, match nodes on the same rack - if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) { + if (chooseTarget(db, source, targetTypes, Matcher.SAME_RACK)) { return true; } // At last, match all remaining nodes - if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) { - return true; - } - return false; + return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER); } - boolean chooseTarget(DBlock db, MLocation ml, Source source, + boolean chooseTarget(DBlock db, Source source, List targetTypes, Matcher matcher) { final NetworkTopology cluster = dispatcher.getCluster(); - for(final Iterator i = targetTypes.iterator(); i.hasNext(); ) { - final StorageType t = i.next(); + for (StorageType t : targetTypes) { for(StorageGroup target : storages.getTargetStorages(t)) { - if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) { + if (matcher.match(cluster, source.getDatanodeInfo(), + target.getDatanodeInfo())) { final PendingMove pm = source.addPendingMove(db, target); if (pm != null) { - i.remove(); dispatcher.executePendingMove(pm); return true; } @@ -367,7 +384,6 @@ boolean chooseTarget(DBlock db, MLocation ml, Source source, return false; } } - static class MLocation { final DatanodeInfo datanode; @@ -392,7 +408,8 @@ static List toLocations(LocatedBlock lb) { } } - private static class StorageTypeDiff { + @VisibleForTesting + static class StorageTypeDiff { final List expected; final List existing; @@ -403,7 +420,8 @@ private static class StorageTypeDiff { /** * Remove the overlap between the expected types and the existing types. - * @return if the existing types is empty after removed the overlap. + * @return if the existing types or the expected types is empty after + * removing the overlap. */ boolean removeOverlap() { for(Iterator i = existing.iterator(); i.hasNext(); ) { @@ -412,38 +430,42 @@ boolean removeOverlap() { i.remove(); } } - return existing.isEmpty(); + return expected.isEmpty() || existing.isEmpty(); } } static int run(Collection namenodes, Configuration conf) throws IOException, InterruptedException { - final long sleeptime = 2000*conf.getLong( + final long sleeptime = 2000 * conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); LOG.info("namenodes = " + namenodes); List connectors = Collections.emptyList(); try { - connectors = NameNodeConnector.newNameNodeConnectors(namenodes, + connectors = NameNodeConnector.newNameNodeConnectors(namenodes, Mover.class.getSimpleName(), MOVER_ID_PATH, conf); - while (true) { + while (connectors.size() > 0) { Collections.shuffle(connectors); - for(NameNodeConnector nnc : connectors) { + Iterator iter = connectors.iterator(); + while (iter.hasNext()) { + NameNodeConnector nnc = iter.next(); final Mover m = new Mover(nnc, conf); final ExitStatus r = m.run(); - if (r != ExitStatus.IN_PROGRESS) { - //must be an error statue, return. + if (r == ExitStatus.SUCCESS) { + iter.remove(); + } else if (r != ExitStatus.IN_PROGRESS) { + // must be an error statue, return return r.getExitCode(); } } - Thread.sleep(sleeptime); } + return ExitStatus.SUCCESS.getExitCode(); } finally { - for(NameNodeConnector nnc : connectors) { + for (NameNodeConnector nnc : connectors) { IOUtils.cleanup(LOG, nnc); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java new file mode 100644 index 0000000000..d2a7fcc354 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.mover; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher; +import org.apache.hadoop.hdfs.server.balancer.ExitStatus; +import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.util.*; + +/** + * Test the data migration tool (for Archival Storage) + */ +public class TestStorageMover { + private static final long BLOCK_SIZE = 1024; + private static final short REPL = 3; + private static final int NUM_DATANODES = 6; + private static final Configuration DEFAULT_CONF = new HdfsConfiguration(); + private static final BlockStoragePolicy.Suite DEFAULT_POLICIES; + private static final BlockStoragePolicy HOT; + private static final BlockStoragePolicy WARM; + private static final BlockStoragePolicy COLD; + + static { + DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new + HdfsConfiguration()); + HOT = DEFAULT_POLICIES.getPolicy("HOT"); + WARM = DEFAULT_POLICIES.getPolicy("WARM"); + COLD = DEFAULT_POLICIES.getPolicy("COLD"); + Dispatcher.setBlockMoveWaitTime(10 * 1000); + } + + /** + * This scheme defines files/directories and their block storage policies. It + * also defines snapshots. + */ + static class NamespaceScheme { + final List files; + final Map> snapshotMap; + final Map policyMap; + + NamespaceScheme(List files, Map> snapshotMap, + Map policyMap) { + this.files = files; + this.snapshotMap = snapshotMap == null ? + new HashMap>() : snapshotMap; + this.policyMap = policyMap; + } + } + + /** + * This scheme defines DataNodes and their storage, including storage types + * and remaining capacities. + */ + static class ClusterScheme { + final Configuration conf; + final int numDataNodes; + final short repl; + final StorageType[][] storageTypes; + final long[][] storageCapacities; + + ClusterScheme(Configuration conf, int numDataNodes, short repl, + StorageType[][] types, long[][] capacities) { + Preconditions.checkArgument(types == null || types.length == numDataNodes); + Preconditions.checkArgument(capacities == null || capacities.length == + numDataNodes); + this.conf = conf; + this.numDataNodes = numDataNodes; + this.repl = repl; + this.storageTypes = types; + this.storageCapacities = capacities; + } + } + + class MigrationTest { + private final ClusterScheme clusterScheme; + private final NamespaceScheme nsScheme; + private final Configuration conf; + + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + private final BlockStoragePolicy.Suite policies; + + MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) { + this.clusterScheme = cScheme; + this.nsScheme = nsScheme; + this.conf = clusterScheme.conf; + this.policies = BlockStoragePolicy.readBlockStorageSuite(conf); + } + + /** + * Set up the cluster and start NameNode and DataNodes according to the + * corresponding scheme. + */ + void setupCluster() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(clusterScheme + .numDataNodes).storageTypes(clusterScheme.storageTypes) + .storageCapacities(clusterScheme.storageCapacities).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + } + + void shutdownCluster() throws Exception { + IOUtils.cleanup(null, dfs); + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Create files/directories and set their storage policies according to the + * corresponding scheme. + */ + void prepareNamespace() throws Exception { + for (Path file : nsScheme.files) { + DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl, + 0L); + } + for (Map.Entry> entry : nsScheme.snapshotMap.entrySet()) { + for (String snapshot : entry.getValue()) { + SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot); + } + } + for (Map.Entry entry : nsScheme.policyMap.entrySet()) { + dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName()); + } + } + + /** + * Run the migration tool. + */ + void migrate(String... args) throws Exception { + runMover(); + } + + /** + * Verify block locations after running the migration tool. + */ + void verify(boolean verifyAll) throws Exception { + if (verifyAll) { + verifyNamespace(); + } else { + // TODO verify according to the given path list + + } + } + + private void runMover() throws Exception { + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + int result = Mover.run(namenodes, conf); + Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result); + } + + private void verifyNamespace() throws Exception { + HdfsFileStatus status = dfs.getClient().getFileInfo("/"); + verifyRecursively(null, status); + } + + private void verifyRecursively(final Path parent, + final HdfsFileStatus status) throws Exception { + if (status.isDir()) { + Path fullPath = parent == null ? + new Path("/") : status.getFullPath(parent); + DirectoryListing children = dfs.getClient().listPaths( + fullPath.toString(), HdfsFileStatus.EMPTY_NAME, true); + for (HdfsFileStatus child : children.getPartialListing()) { + verifyRecursively(fullPath, child); + } + } else if (!status.isSymlink()) { // is file + HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; + byte policyId = fileStatus.getStoragePolicy(); + BlockStoragePolicy policy = policies.getPolicy(policyId); + final List types = policy.chooseStorageTypes( + status.getReplication()); + for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) { + final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, + lb.getStorageTypes()); + Assert.assertTrue(diff.removeOverlap()); + } + } + } + } + + private static StorageType[][] genStorageTypes(int numDataNodes) { + StorageType[][] types = new StorageType[numDataNodes][]; + for (int i = 0; i < types.length; i++) { + types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}; + } + return types; + } + + private void runTest(MigrationTest test) throws Exception { + test.setupCluster(); + try { + test.prepareNamespace(); + test.migrate(); + Thread.sleep(5000); // let the NN finish deletion + test.verify(true); + } finally { + test.shutdownCluster(); + } + } + + /** + * A normal case for Mover: move a file into archival storage + */ + @Test + public void testMigrateFileToArchival() throws Exception { + final Path foo = new Path("/foo"); + Map policyMap = Maps.newHashMap(); + policyMap.put(foo, COLD); + NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null, + policyMap); + ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); + MigrationTest test = new MigrationTest(clusterScheme, nsScheme); + runTest(test); + } +}