diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 39a0051089..b141502c0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -97,23 +97,53 @@ public synchronized void add(ItemInfo trackInfo) { } /** - * Add the itemInfo to tracking list for which storage movement - * expected if necessary. + * Add the itemInfo list to tracking list for which storage movement expected + * if necessary. + * * @param startId - * - start id + * - start id * @param itemInfoList - * - List of child in the directory + * - List of child in the directory + * @param scanCompleted + * -Indicates whether the start id directory has no more elements to + * scan. */ @VisibleForTesting - public synchronized void addAll(long startId, - List itemInfoList, boolean scanCompleted) { + public synchronized void addAll(long startId, List itemInfoList, + boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); + updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted); + } + + /** + * Add the itemInfo to tracking list for which storage movement expected if + * necessary. + * + * @param itemInfoList + * - List of child in the directory + * @param scanCompleted + * -Indicates whether the ItemInfo start id directory has no more + * elements to scan. + */ + @VisibleForTesting + public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { + storageMovementNeeded.add(itemInfo); + // This represents sps start id is file, so no need to update pending dir + // stats. + if (itemInfo.getStartId() == itemInfo.getFileId()) { + return; + } + updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted); + } + + private void updatePendingDirScanStats(long startId, int numScannedFiles, + boolean scanCompleted) { DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); if (pendingWork == null) { pendingWork = new DirPendingWorkInfo(); pendingWorkForDirectory.put(startId, pendingWork); } - pendingWork.addPendingWorkCount(itemInfoList.size()); + pendingWork.addPendingWorkCount(numScannedFiles); if (scanCompleted) { pendingWork.markScanCompleted(); } @@ -250,13 +280,15 @@ private class SPSPathIdProcessor implements Runnable { @Override public void run() { - LOG.info("Starting FileInodeIdCollector!."); + LOG.info("Starting SPSPathIdProcessor!."); long lastStatusCleanTime = 0; + Long startINodeId = null; while (ctxt.isRunning()) { - LOG.info("Running FileInodeIdCollector!."); try { if (!ctxt.isInSafeMode()) { - Long startINodeId = ctxt.getNextSPSPathId(); + if (startINodeId == null) { + startINodeId = ctxt.getNextSPSPathId(); + } // else same id will be retried if (startINodeId == null) { // Waiting for SPS path Thread.sleep(3000); @@ -281,9 +313,18 @@ public void run() { lastStatusCleanTime = Time.monotonicNow(); cleanSpsStatus(); } + startINodeId = null; // Current inode id successfully scanned. } } catch (Throwable t) { - LOG.warn("Exception while loading inodes to satisfy the policy", t); + String reClass = t.getClass().getName(); + if (InterruptedException.class.getName().equals(reClass)) { + LOG.info("SPSPathIdProcessor thread is interrupted. Stopping.."); + Thread.currentThread().interrupt(); + break; + } + LOG.warn("Exception while scanning file inodes to satisfy the policy", + t); + // TODO: may be we should retry the current inode id? } } } @@ -426,4 +467,11 @@ public static void setStatusClearanceElapsedTimeMs( public static long getStatusClearanceElapsedTimeMs() { return statusClearanceElapsedTimeMs; } + + public void markScanCompletedForDir(Long inodeId) { + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId); + if (pendingWork != null) { + pendingWork.markScanCompleted(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index b7053b9a8a..f103dfe61c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -167,4 +167,12 @@ boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, */ void removeAllSPSPathIds(); + /** + * Gets the file path for a given inode id. + * + * @param inodeId + * - path inode id. + */ + String getFilePath(Long inodeId); + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java index 1da4af98c2..b27e8c98b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -29,6 +30,7 @@ * This class handles the internal SPS block movements. This will assign block * movement tasks to target datanode descriptors. */ +@InterfaceAudience.Private public class IntraSPSNameNodeBlockMoveTaskHandler implements BlockMoveTaskHandler { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index cef26ed418..aed684ab80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; @@ -46,6 +47,7 @@ * are expecting to change its storages and assigning the block storage * movements to satisfy the storage policy. */ +@InterfaceAudience.Private public class IntraSPSNameNodeContext implements Context { private static final Logger LOG = LoggerFactory .getLogger(IntraSPSNameNodeContext.class); @@ -195,4 +197,9 @@ public void removeSPSPathId(long trackId) { public void removeAllSPSPathIds() { blockManager.removeAllSPSPathIds(); } + + @Override + public String getFilePath(Long inodeId) { + return namesystem.getFilePath(inodeId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index c6834c1c95..f7cd754566 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; @@ -32,6 +33,7 @@ * A specific implementation for scanning the directory with Namenode internal * Inode structure and collects the file ids under the given directory ID. */ +@InterfaceAudience.Private public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser implements FileIdCollector { private int maxQueueLimitToScan; @@ -131,12 +133,12 @@ public void scanAndCollectFileIds(final Long startINodeId) } else { readLock(); - // NOTE: this lock will not be held until full directory scanning. It is + // NOTE: this lock will not be held for full directory scanning. It is // basically a sliced locking. Once it collects a batch size( at max the // size of maxQueueLimitToScan (default 1000)) file ids, then it will // unlock and submits the current batch to SPSService. Once // service.processingQueueSize() shows empty slots, then lock will be - // resumed and scan also will be resumed. This logic was re-used from + // re-acquired and scan will be resumed. This logic was re-used from // EDEK feature. try { traverseDir(startInode.asDirectory(), startINodeId, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index 6d85ea69a0..d74e391544 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -80,7 +80,7 @@ void init(Context ctxt, FileIdCollector fileIDCollector, * * @param itemInfo */ - void addFileIdToProcess(ItemInfo itemInfo); + void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted); /** * Adds all the Item information(file id etc) to processing queue. @@ -104,4 +104,12 @@ void addAllFileIdsToProcess(long startId, List itemInfoList, * @return the configuration. */ Configuration getConf(); + + /** + * Marks the scanning of directory if finished. + * + * @param inodeId + * - directory inode id. + */ + void markScanCompletedForPath(Long inodeId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 28c1372b52..aafdc65cf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -563,7 +563,6 @@ private boolean findSourceAndTargetToMove( chosenTarget.storageType, blockMovingInfos); } expected.remove(chosenTarget.storageType); - // TODO: We can increment scheduled block count for this node? } } // To avoid choosing this excludeNodes as targets later @@ -924,7 +923,7 @@ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( } @Override - public void addFileIdToProcess(ItemInfo trackInfo) { + public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo); } @@ -948,4 +947,9 @@ public Configuration getConf() { public BlockStorageMovementNeeded getStorageMovementQueue() { return storageMovementNeeded; } + + @Override + public void markScanCompletedForPath(Long inodeId) { + getStorageMovementQueue().markScanCompletedForDir(inodeId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java new file mode 100644 index 0000000000..597a7d319c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.hdfs.server.sps; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; +import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is to scan the paths recursively. If file is directory, then it + * will scan for files recursively. If the file is non directory, then it will + * just submit the same file to process. + */ +@InterfaceAudience.Private +public class ExternalSPSFileIDCollector implements FileIdCollector { + public static final Logger LOG = + LoggerFactory.getLogger(ExternalSPSFileIDCollector.class); + private Context cxt; + private DistributedFileSystem dfs; + private SPSService service; + private int maxQueueLimitToScan; + + public ExternalSPSFileIDCollector(Context cxt, SPSService service, + int batchSize) { + this.cxt = cxt; + this.service = service; + this.maxQueueLimitToScan = service.getConf().getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); + try { + // TODO: probably we could get this dfs from external context? but this is + // too specific to external. + dfs = getFS(service.getConf()); + } catch (IOException e) { + LOG.error("Unable to get the filesystem. Make sure Namenode running and " + + "configured namenode address is correct.", e); + } + } + + private DistributedFileSystem getFS(Configuration conf) throws IOException { + return (DistributedFileSystem) FileSystem + .get(FileSystem.getDefaultUri(conf), conf); + } + + /** + * Recursively scan the given path and add the file info to SPS service for + * processing. + */ + private void processPath(long startID, String fullPath) { + for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { + final DirectoryListing children; + try { + children = dfs.getClient().listPaths(fullPath, lastReturnedName, false); + } catch (IOException e) { + LOG.warn("Failed to list directory " + fullPath + + ". Ignore the directory and continue.", e); + return; + } + if (children == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The scanning start dir/sub dir " + fullPath + + " does not have childrens."); + } + return; + } + + for (HdfsFileStatus child : children.getPartialListing()) { + if (child.isFile()) { + service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()), + false); + checkProcessingQueuesFree(); + } else { + String fullPathStr = child.getFullName(fullPath); + if (child.isDirectory()) { + if (!fullPathStr.endsWith(Path.SEPARATOR)) { + fullPathStr = fullPathStr + Path.SEPARATOR; + } + processPath(startID, fullPathStr); + } + } + } + + if (children.hasMore()) { + lastReturnedName = children.getLastName(); + } else { + return; + } + } + } + + private void checkProcessingQueuesFree() { + int remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + remainingCapacity = remainingCapacity(); + } + } + + /** + * Returns queue remaining capacity. + */ + public int remainingCapacity() { + int size = service.processingQueueSize(); + if (size >= maxQueueLimitToScan) { + return 0; + } else { + return (maxQueueLimitToScan - size); + } + } + + @Override + public void scanAndCollectFileIds(Long inodeId) throws IOException { + if (dfs == null) { + dfs = getFS(service.getConf()); + } + processPath(inodeId, cxt.getFilePath(inodeId)); + service.markScanCompletedForPath(inodeId); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java new file mode 100644 index 0000000000..f705df2ba1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides a mechanism for satisfying the storage policy of a + * path. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.hdfs.server.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 935404424e..e0bf410528 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -71,6 +71,7 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,18 +94,41 @@ public class TestStoragePolicySatisfier { private static final String COLD = "COLD"; private static final Logger LOG = LoggerFactory.getLogger(TestStoragePolicySatisfier.class); - private final Configuration config = new HdfsConfiguration(); + private Configuration config = null; private StorageType[][] allDiskTypes = new StorageType[][]{{StorageType.DISK, StorageType.DISK}, {StorageType.DISK, StorageType.DISK}, {StorageType.DISK, StorageType.DISK}}; private MiniDFSCluster hdfsCluster = null; - final private int numOfDatanodes = 3; - final private int storagesPerDatanode = 2; - final private long capacity = 2 * 256 * 1024 * 1024; - final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; private DistributedFileSystem dfs = null; - private static final int DEFAULT_BLOCK_SIZE = 1024; + public static final int NUM_OF_DATANODES = 3; + public static final int STORAGES_PER_DATANODE = 2; + public static final long CAPACITY = 2 * 256 * 1024 * 1024; + public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying"; + public static final int DEFAULT_BLOCK_SIZE = 1024; + + /** + * Sets hdfs cluster. + */ + public void setCluster(MiniDFSCluster cluster) { + this.hdfsCluster = cluster; + } + + /** + * @return conf. + */ + public Configuration getConf() { + return this.config; + } + + /** + * Gets distributed file system. + * + * @throws IOException + */ + public void getFS() throws IOException { + this.dfs = hdfsCluster.getFileSystem(); + } @After public void shutdownCluster() { @@ -113,14 +137,19 @@ public void shutdownCluster() { } } - private void createCluster() throws IOException { + public void createCluster() throws IOException { config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); - hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file); + hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY); + getFS(); + writeContent(FILE); + } + + @Before + public void setUp() { + config = new HdfsConfiguration(); } @Test(timeout = 300000) @@ -137,19 +166,19 @@ public void testWhenStoragePolicySetToCOLD() private void doTestWhenStoragePolicySetToCOLD() throws Exception { // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE}}; - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); hdfsCluster.triggerHeartbeats(); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000, + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000, dfs); } @@ -159,7 +188,7 @@ public void testWhenStoragePolicySetToALLSSD() try { createCluster(); // Change policy to ALL_SSD - dfs.setStoragePolicy(new Path(file), "ALL_SSD"); + dfs.setStoragePolicy(new Path(FILE), "ALL_SSD"); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}, @@ -168,14 +197,13 @@ public void testWhenStoragePolicySetToALLSSD() // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -187,23 +215,22 @@ public void testWhenStoragePolicySetToONESSD() try { createCluster(); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); } finally { shutdownCluster(); } @@ -218,23 +245,22 @@ public void testBlksStorageMovementAttemptFinishedReport() throws Exception { try { createCluster(); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till the block is moved to SSD areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); waitForBlocksMovementAttemptReport(1, 30000); } finally { @@ -251,7 +277,7 @@ public void testMultipleFilesForSatisfyStoragePolicy() throws Exception { try { createCluster(); List files = new ArrayList<>(); - files.add(file); + files.add(FILE); // Creates 4 more files. Send all of them for satisfying the storage // policy together. @@ -271,8 +297,8 @@ public void testMultipleFilesForSatisfyStoragePolicy() throws Exception { // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); hdfsCluster.triggerHeartbeats(); for (String fileName : files) { @@ -300,21 +326,21 @@ public void testSatisfyFileWithHdfsAdmin() throws Exception { HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}}; - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); - hdfsAdmin.satisfyStoragePolicy(new Path(file)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000, + dfs); } finally { shutdownCluster(); } @@ -344,8 +370,8 @@ public void testSatisfyDirWithHdfsAdmin() throws Exception { StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); hdfsAdmin.satisfyStoragePolicy(new Path(subDir)); @@ -384,11 +410,11 @@ public void testSatisfyWithExceptions() throws Exception { new HdfsAdmin(FileSystem.getDefaultUri(config), config); try { - hdfsAdmin.satisfyStoragePolicy(new Path(file)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); Assert.fail(String.format( "Should failed to satisfy storage policy " + "for %s since %s is set to false.", - file, DFS_STORAGE_POLICY_ENABLED_KEY)); + FILE, DFS_STORAGE_POLICY_ENABLED_KEY)); } catch (IOException e) { Assert.assertTrue(e.getMessage().contains(String.format( "Failed to satisfy storage policy since %s is set to false.", @@ -409,17 +435,17 @@ public void testSatisfyWithExceptions() throws Exception { } try { - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - Assert.fail(String.format( - "Should failed to satisfy storage policy " - + "for %s ,since it has been " - + "added to satisfy movement queue.", file)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); + Assert.fail(String.format("Should failed to satisfy storage policy " + + "for %s ,since it has been " + "added to satisfy movement queue.", + FILE)); } catch (IOException e) { GenericTestUtils.assertExceptionContains( String.format("Cannot request to call satisfy storage policy " + "on path %s, as this file/dir was already called for " - + "satisfying storage policy.", file), e); + + "satisfying storage policy.", FILE), + e); } } finally { shutdownCluster(); @@ -446,23 +472,23 @@ public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy() try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}}; // Adding ARCHIVE based datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); waitForBlocksMovementAttemptReport(1, 30000); } finally { @@ -489,22 +515,22 @@ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.DISK}}; // Adding DISK based datanodes - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // No block movement will be scheduled as there is no target node // available with the required storage type. waitForAttemptedItems(1, 30000); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, + dfs); // Since there is no target node the item will get timed out and then // re-attempted. waitForAttemptedItems(1, 30000); @@ -628,8 +654,8 @@ public void testMoveWithBlockPinning() throws Exception { {StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE}}; // Adding DISK based datanodes - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); dfs.satisfyStoragePolicy(new Path(file1)); hdfsCluster.triggerHeartbeats(); @@ -682,21 +708,21 @@ public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes() {StorageType.DISK, StorageType.DISK}, {StorageType.DISK, StorageType.ARCHIVE}}; hdfsCluster = startCluster(config, allDiskTypes, numOfDns, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); - writeContent(file, (short) 5); + writeContent(FILE, (short) 5); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 2, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, + dfs); waitForBlocksMovementAttemptReport(1, 30000); } finally { @@ -720,20 +746,19 @@ public void testBlockMoveInSameDatanodeWithONESSD() throws Exception { config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); try { - hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, - storagesPerDatanode, capacity); + hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); - writeContent(file); + writeContent(FILE); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); } finally { shutdownCluster(); @@ -760,19 +785,19 @@ public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception { true); try { hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); - writeContent(file); + writeContent(FILE); // Change policy to WARM - dfs.setStoragePolicy(new Path(file), "WARM"); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.setStoragePolicy(new Path(FILE), "WARM"); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000, + dfs); } finally { shutdownCluster(); } @@ -794,31 +819,31 @@ public void testSPSWhenReplicaWithExpectedStorageAlreadyAvailableInSource() config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); // 1. Write two replica on disk - DFSTestUtil.createFile(dfs, new Path(file), DEFAULT_BLOCK_SIZE, + DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE, (short) 2, 0); // 2. Change policy to COLD, so third replica will be written to ARCHIVE. - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(FILE), "COLD"); // 3.Change replication factor to 3. - dfs.setReplication(new Path(file), (short) 3); + dfs.setReplication(new Path(FILE), (short) 3); - DFSTestUtil - .waitExpectedStorageType(file, StorageType.DISK, 2, 30000, dfs); - DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000, + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000, dfs); // 4. Change policy to HOT, so we can move the all block to DISK. - dfs.setStoragePolicy(new Path(file), "HOT"); + dfs.setStoragePolicy(new Path(FILE), "HOT"); // 4. Satisfy the policy. - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); // 5. Block should move successfully . - DFSTestUtil - .waitExpectedStorageType(file, StorageType.DISK, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, + dfs); } finally { shutdownCluster(); } @@ -840,13 +865,13 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace() true); long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1); try { - hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, - storagesPerDatanode, dnCapacity); + hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, dnCapacity); dfs = hdfsCluster.getFileSystem(); - writeContent(file); + writeContent(FILE); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); Path filePath = new Path("/testChooseInSameDatanode"); final FSDataOutputStream out = dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE); @@ -869,7 +894,7 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace() for (DataNode dataNode : dataNodes) { DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); } - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); // Wait for items to be processed waitForAttemptedItems(1, 30000); @@ -887,9 +912,9 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace() } hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000, + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, dfs); - DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, dfs); } finally { shutdownCluster(); } @@ -928,7 +953,7 @@ public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles() true); try { hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -1029,8 +1054,7 @@ public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception { {StorageType.ARCHIVE, StorageType.DISK}, {StorageType.ARCHIVE, StorageType.DISK}, {StorageType.ARCHIVE, StorageType.DISK}}; - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .storageTypes(newtypes).build(); + cluster = startCluster(conf, newtypes, 3, 2, CAPACITY); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/zeroSizeFile"); @@ -1211,7 +1235,7 @@ public void testMultipleLevelDirectoryForSatisfyStoragePolicy() config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); @@ -1245,7 +1269,7 @@ public void testBatchProcessingForSPSDirectory() throws Exception { config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 5); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); List files = getDFSListOfTree(); @@ -1284,7 +1308,7 @@ public void testTraverseWhenParentDeleted() throws Exception { config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); @@ -1312,8 +1336,7 @@ public boolean isRunning() { } }; - FileIdCollector fileIDCollector = - new IntraSPSNameNodeFileIdCollector(fsDir, sps); + FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null); sps.getStorageMovementQueue().activate(); @@ -1323,31 +1346,20 @@ public boolean isRunning() { //Wait for thread to reach U. Thread.sleep(1000); - dfs.delete(new Path("/root/D/L"), true); - // Remove 10 element and make queue free, So other traversing will start. - for (int i = 0; i < 10; i++) { - String path = expectedTraverseOrder.remove(0); - long trackId = sps.getStorageMovementQueue().get().getFileId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } - //Wait to finish tree traverse - Thread.sleep(5000); - // Check other element traversed in order and R,S should not be added in - // queue which we already removed from expected list - for (String path : expectedTraverseOrder) { - long trackId = sps.getStorageMovementQueue().get().getFileId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } + assertTraversal(expectedTraverseOrder, fsDir, sps); dfs.delete(new Path("/root"), true); } + public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, + Context ctxt) { + FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( + hdfsCluster.getNamesystem().getFSDirectory(), sps); + return fileIDCollector; + } + /** * Test traverse when root parent got deleted. * 1. Delete L when traversing Q @@ -1362,7 +1374,7 @@ public void testTraverseWhenRootParentDeleted() throws Exception { config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); @@ -1378,7 +1390,6 @@ public void testTraverseWhenRootParentDeleted() throws Exception { // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - // StoragePolicySatisfier sps = new StoragePolicySatisfier(config); StoragePolicySatisfier sps = new StoragePolicySatisfier(config); Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @@ -1392,9 +1403,7 @@ public boolean isRunning() { return true; } }; - - FileIdCollector fileIDCollector = - new IntraSPSNameNodeFileIdCollector(fsDir, sps); + FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null); sps.getStorageMovementQueue().activate(); @@ -1407,6 +1416,13 @@ public boolean isRunning() { dfs.delete(new Path("/root/D/L"), true); + assertTraversal(expectedTraverseOrder, fsDir, sps); + dfs.delete(new Path("/root"), true); + } + + private void assertTraversal(List expectedTraverseOrder, + FSDirectory fsDir, StoragePolicySatisfier sps) + throws InterruptedException { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); @@ -1426,7 +1442,6 @@ public boolean isRunning() { assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); } - dfs.delete(new Path("/root"), true); } /** @@ -1473,8 +1488,8 @@ public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception { StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.SSD}, {StorageType.DISK, StorageType.SSD}}; - startAdditionalDNs(config, 2, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); // increase replication factor to 4 for the first 10 files and thus // initiate replica tasks @@ -1772,7 +1787,7 @@ public Boolean get() { }, 100, timeout); } - private void writeContent(final String fileName) throws IOException { + public void writeContent(final String fileName) throws IOException { writeContent(fileName, (short) 3); } @@ -1805,7 +1820,7 @@ private void startAdditionalDNs(final Configuration conf, cluster.triggerHeartbeats(); } - private MiniDFSCluster startCluster(final Configuration conf, + public MiniDFSCluster startCluster(final Configuration conf, StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, long nodeCapacity) throws IOException { long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java new file mode 100644 index 0000000000..3ced34ed90 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.sps; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; +import org.junit.Ignore; + +/** + * Tests the external sps service plugins. + */ +public class TestExternalStoragePolicySatisfier + extends TestStoragePolicySatisfier { + private StorageType[][] allDiskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}; + + @Override + public void createCluster() throws IOException { + getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + getConf().setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY)); + getFS(); + writeContent(FILE); + } + + @Override + public MiniDFSCluster startCluster(final Configuration conf, + StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, + long nodeCapacity) throws IOException { + long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; + for (int i = 0; i < numberOfDatanodes; i++) { + for (int j = 0; j < storagesPerDn; j++) { + capacities[i][j] = nodeCapacity; + } + } + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) + .storageTypes(storageTypes).storageCapacities(capacities).build(); + cluster.waitActive(); + if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + false)) { + SPSService spsService = cluster.getNameNode().getNamesystem() + .getBlockManager().getSPSService(); + spsService.stopGracefully(); + + IntraSPSNameNodeContext context = new IntraSPSNameNodeContext( + cluster.getNameNode().getNamesystem(), + cluster.getNameNode().getNamesystem().getBlockManager(), cluster + .getNameNode().getNamesystem().getBlockManager().getSPSService()); + + spsService.init(context, + new ExternalSPSFileIDCollector(context, + cluster.getNameNode().getNamesystem().getBlockManager() + .getSPSService(), + 5), + new IntraSPSNameNodeBlockMoveTaskHandler( + cluster.getNameNode().getNamesystem().getBlockManager(), + cluster.getNameNode().getNamesystem())); + spsService.start(true); + } + return cluster; + } + + @Override + public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, + Context ctxt) { + return new ExternalSPSFileIDCollector(ctxt, sps, 5); + } + + /** + * This test need not run as external scan is not a batch based scanning right + * now. + */ + @Ignore("ExternalFileIdCollector is not batch based right now." + + " So, ignoring it.") + public void testBatchProcessingForSPSDirectory() throws Exception { + } +}