HDFS-13025. [SPS]: Implement a mechanism to scan the files for external SPS. Contributed by Uma Maheswara Rao G.

This commit is contained in:
Rakesh Radhakrishnan 2018-01-23 20:09:26 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 8d4f74e733
commit 3159b39cf8
11 changed files with 556 additions and 170 deletions

View File

@ -97,23 +97,53 @@ public synchronized void add(ItemInfo trackInfo) {
}
/**
* Add the itemInfo to tracking list for which storage movement
* expected if necessary.
* Add the itemInfo list to tracking list for which storage movement expected
* if necessary.
*
* @param startId
* - start id
* - start id
* @param itemInfoList
* - List of child in the directory
* - List of child in the directory
* @param scanCompleted
* -Indicates whether the start id directory has no more elements to
* scan.
*/
@VisibleForTesting
public synchronized void addAll(long startId,
List<ItemInfo> itemInfoList, boolean scanCompleted) {
public synchronized void addAll(long startId, List<ItemInfo> itemInfoList,
boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted);
}
/**
* Add the itemInfo to tracking list for which storage movement expected if
* necessary.
*
* @param itemInfoList
* - List of child in the directory
* @param scanCompleted
* -Indicates whether the ItemInfo start id directory has no more
* elements to scan.
*/
@VisibleForTesting
public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
storageMovementNeeded.add(itemInfo);
// This represents sps start id is file, so no need to update pending dir
// stats.
if (itemInfo.getStartId() == itemInfo.getFileId()) {
return;
}
updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted);
}
private void updatePendingDirScanStats(long startId, int numScannedFiles,
boolean scanCompleted) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork == null) {
pendingWork = new DirPendingWorkInfo();
pendingWorkForDirectory.put(startId, pendingWork);
}
pendingWork.addPendingWorkCount(itemInfoList.size());
pendingWork.addPendingWorkCount(numScannedFiles);
if (scanCompleted) {
pendingWork.markScanCompleted();
}
@ -250,13 +280,15 @@ private class SPSPathIdProcessor implements Runnable {
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0;
Long startINodeId = null;
while (ctxt.isRunning()) {
LOG.info("Running FileInodeIdCollector!.");
try {
if (!ctxt.isInSafeMode()) {
Long startINodeId = ctxt.getNextSPSPathId();
if (startINodeId == null) {
startINodeId = ctxt.getNextSPSPathId();
} // else same id will be retried
if (startINodeId == null) {
// Waiting for SPS path
Thread.sleep(3000);
@ -281,9 +313,18 @@ public void run() {
lastStatusCleanTime = Time.monotonicNow();
cleanSpsStatus();
}
startINodeId = null; // Current inode id successfully scanned.
}
} catch (Throwable t) {
LOG.warn("Exception while loading inodes to satisfy the policy", t);
String reClass = t.getClass().getName();
if (InterruptedException.class.getName().equals(reClass)) {
LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
Thread.currentThread().interrupt();
break;
}
LOG.warn("Exception while scanning file inodes to satisfy the policy",
t);
// TODO: may be we should retry the current inode id?
}
}
}
@ -426,4 +467,11 @@ public static void setStatusClearanceElapsedTimeMs(
public static long getStatusClearanceElapsedTimeMs() {
return statusClearanceElapsedTimeMs;
}
public void markScanCompletedForDir(Long inodeId) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId);
if (pendingWork != null) {
pendingWork.markScanCompleted();
}
}
}

View File

@ -167,4 +167,12 @@ boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
*/
void removeAllSPSPathIds();
/**
* Gets the file path for a given inode id.
*
* @param inodeId
* - path inode id.
*/
String getFilePath(Long inodeId);
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -29,6 +30,7 @@
* This class handles the internal SPS block movements. This will assign block
* movement tasks to target datanode descriptors.
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeBlockMoveTaskHandler
implements BlockMoveTaskHandler {

View File

@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
@ -46,6 +47,7 @@
* are expecting to change its storages and assigning the block storage
* movements to satisfy the storage policy.
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeContext implements Context {
private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class);
@ -195,4 +197,9 @@ public void removeSPSPathId(long trackId) {
public void removeAllSPSPathIds() {
blockManager.removeAllSPSPathIds();
}
@Override
public String getFilePath(Long inodeId) {
return namesystem.getFilePath(inodeId);
}
}

View File

@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@ -32,6 +33,7 @@
* A specific implementation for scanning the directory with Namenode internal
* Inode structure and collects the file ids under the given directory ID.
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
implements FileIdCollector {
private int maxQueueLimitToScan;
@ -131,12 +133,12 @@ public void scanAndCollectFileIds(final Long startINodeId)
} else {
readLock();
// NOTE: this lock will not be held until full directory scanning. It is
// NOTE: this lock will not be held for full directory scanning. It is
// basically a sliced locking. Once it collects a batch size( at max the
// size of maxQueueLimitToScan (default 1000)) file ids, then it will
// unlock and submits the current batch to SPSService. Once
// service.processingQueueSize() shows empty slots, then lock will be
// resumed and scan also will be resumed. This logic was re-used from
// re-acquired and scan will be resumed. This logic was re-used from
// EDEK feature.
try {
traverseDir(startInode.asDirectory(), startINodeId,

View File

@ -80,7 +80,7 @@ void init(Context ctxt, FileIdCollector fileIDCollector,
*
* @param itemInfo
*/
void addFileIdToProcess(ItemInfo itemInfo);
void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted);
/**
* Adds all the Item information(file id etc) to processing queue.
@ -104,4 +104,12 @@ void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
* @return the configuration.
*/
Configuration getConf();
/**
* Marks the scanning of directory if finished.
*
* @param inodeId
* - directory inode id.
*/
void markScanCompletedForPath(Long inodeId);
}

View File

@ -563,7 +563,6 @@ private boolean findSourceAndTargetToMove(
chosenTarget.storageType, blockMovingInfos);
}
expected.remove(chosenTarget.storageType);
// TODO: We can increment scheduled block count for this node?
}
}
// To avoid choosing this excludeNodes as targets later
@ -924,7 +923,7 @@ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
}
@Override
public void addFileIdToProcess(ItemInfo trackInfo) {
public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo);
}
@ -948,4 +947,9 @@ public Configuration getConf() {
public BlockStorageMovementNeeded getStorageMovementQueue() {
return storageMovementNeeded;
}
@Override
public void markScanCompletedForPath(Long inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId);
}
}

View File

@ -0,0 +1,156 @@
package org.apache.hadoop.hdfs.server.sps;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is to scan the paths recursively. If file is directory, then it
* will scan for files recursively. If the file is non directory, then it will
* just submit the same file to process.
*/
@InterfaceAudience.Private
public class ExternalSPSFileIDCollector implements FileIdCollector {
public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSFileIDCollector.class);
private Context cxt;
private DistributedFileSystem dfs;
private SPSService service;
private int maxQueueLimitToScan;
public ExternalSPSFileIDCollector(Context cxt, SPSService service,
int batchSize) {
this.cxt = cxt;
this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
try {
// TODO: probably we could get this dfs from external context? but this is
// too specific to external.
dfs = getFS(service.getConf());
} catch (IOException e) {
LOG.error("Unable to get the filesystem. Make sure Namenode running and "
+ "configured namenode address is correct.", e);
}
}
private DistributedFileSystem getFS(Configuration conf) throws IOException {
return (DistributedFileSystem) FileSystem
.get(FileSystem.getDefaultUri(conf), conf);
}
/**
* Recursively scan the given path and add the file info to SPS service for
* processing.
*/
private void processPath(long startID, String fullPath) {
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
children = dfs.getClient().listPaths(fullPath, lastReturnedName, false);
} catch (IOException e) {
LOG.warn("Failed to list directory " + fullPath
+ ". Ignore the directory and continue.", e);
return;
}
if (children == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The scanning start dir/sub dir " + fullPath
+ " does not have childrens.");
}
return;
}
for (HdfsFileStatus child : children.getPartialListing()) {
if (child.isFile()) {
service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()),
false);
checkProcessingQueuesFree();
} else {
String fullPathStr = child.getFullName(fullPath);
if (child.isDirectory()) {
if (!fullPathStr.endsWith(Path.SEPARATOR)) {
fullPathStr = fullPathStr + Path.SEPARATOR;
}
processPath(startID, fullPathStr);
}
}
}
if (children.hasMore()) {
lastReturnedName = children.getLastName();
} else {
return;
}
}
}
private void checkProcessingQueuesFree() {
int remainingCapacity = remainingCapacity();
// wait for queue to be free
while (remainingCapacity <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for storageMovementNeeded queue to be free!");
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
remainingCapacity = remainingCapacity();
}
}
/**
* Returns queue remaining capacity.
*/
public int remainingCapacity() {
int size = service.processingQueueSize();
if (size >= maxQueueLimitToScan) {
return 0;
} else {
return (maxQueueLimitToScan - size);
}
}
@Override
public void scanAndCollectFileIds(Long inodeId) throws IOException {
if (dfs == null) {
dfs = getFS(service.getConf());
}
processPath(inodeId, cxt.getFilePath(inodeId));
service.markScanCompletedForPath(inodeId);
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package provides a mechanism for satisfying the storage policy of a
* path.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.hdfs.server.sps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,18 +94,41 @@ public class TestStoragePolicySatisfier {
private static final String COLD = "COLD";
private static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
private final Configuration config = new HdfsConfiguration();
private Configuration config = null;
private StorageType[][] allDiskTypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}};
private MiniDFSCluster hdfsCluster = null;
final private int numOfDatanodes = 3;
final private int storagesPerDatanode = 2;
final private long capacity = 2 * 256 * 1024 * 1024;
final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
private DistributedFileSystem dfs = null;
private static final int DEFAULT_BLOCK_SIZE = 1024;
public static final int NUM_OF_DATANODES = 3;
public static final int STORAGES_PER_DATANODE = 2;
public static final long CAPACITY = 2 * 256 * 1024 * 1024;
public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying";
public static final int DEFAULT_BLOCK_SIZE = 1024;
/**
* Sets hdfs cluster.
*/
public void setCluster(MiniDFSCluster cluster) {
this.hdfsCluster = cluster;
}
/**
* @return conf.
*/
public Configuration getConf() {
return this.config;
}
/**
* Gets distributed file system.
*
* @throws IOException
*/
public void getFS() throws IOException {
this.dfs = hdfsCluster.getFileSystem();
}
@After
public void shutdownCluster() {
@ -113,14 +137,19 @@ public void shutdownCluster() {
}
}
private void createCluster() throws IOException {
public void createCluster() throws IOException {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
writeContent(file);
hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
STORAGES_PER_DATANODE, CAPACITY);
getFS();
writeContent(FILE);
}
@Before
public void setUp() {
config = new HdfsConfiguration();
}
@Test(timeout = 300000)
@ -137,19 +166,19 @@ public void testWhenStoragePolicySetToCOLD()
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsCluster.triggerHeartbeats();
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
// Wait till namenode notified about the block location details
DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000,
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000,
dfs);
}
@ -159,7 +188,7 @@ public void testWhenStoragePolicySetToALLSSD()
try {
createCluster();
// Change policy to ALL_SSD
dfs.setStoragePolicy(new Path(file), "ALL_SSD");
dfs.setStoragePolicy(new Path(FILE), "ALL_SSD");
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK},
@ -168,14 +197,13 @@ public void testWhenStoragePolicySetToALLSSD()
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file));
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 3, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, dfs);
} finally {
shutdownCluster();
}
@ -187,23 +215,22 @@ public void testWhenStoragePolicySetToONESSD()
try {
createCluster();
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file));
startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
dfs);
} finally {
shutdownCluster();
}
@ -218,23 +245,22 @@ public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
try {
createCluster();
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file));
startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
dfs);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
@ -251,7 +277,7 @@ public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
try {
createCluster();
List<String> files = new ArrayList<>();
files.add(file);
files.add(FILE);
// Creates 4 more files. Send all of them for satisfying the storage
// policy together.
@ -271,8 +297,8 @@ public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsCluster.triggerHeartbeats();
for (String fileName : files) {
@ -300,21 +326,21 @@ public void testSatisfyFileWithHdfsAdmin() throws Exception {
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}};
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsAdmin.satisfyStoragePolicy(new Path(file));
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 3, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000,
dfs);
} finally {
shutdownCluster();
}
@ -344,8 +370,8 @@ public void testSatisfyDirWithHdfsAdmin() throws Exception {
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsAdmin.satisfyStoragePolicy(new Path(subDir));
@ -384,11 +410,11 @@ public void testSatisfyWithExceptions() throws Exception {
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
hdfsAdmin.satisfyStoragePolicy(new Path(file));
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
Assert.fail(String.format(
"Should failed to satisfy storage policy "
+ "for %s since %s is set to false.",
file, DFS_STORAGE_POLICY_ENABLED_KEY));
FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(String.format(
"Failed to satisfy storage policy since %s is set to false.",
@ -409,17 +435,17 @@ public void testSatisfyWithExceptions() throws Exception {
}
try {
hdfsAdmin.satisfyStoragePolicy(new Path(file));
hdfsAdmin.satisfyStoragePolicy(new Path(file));
Assert.fail(String.format(
"Should failed to satisfy storage policy "
+ "for %s ,since it has been "
+ "added to satisfy movement queue.", file));
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
Assert.fail(String.format("Should failed to satisfy storage policy "
+ "for %s ,since it has been " + "added to satisfy movement queue.",
FILE));
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
String.format("Cannot request to call satisfy storage policy "
+ "on path %s, as this file/dir was already called for "
+ "satisfying storage policy.", file), e);
+ "satisfying storage policy.", FILE),
e);
}
} finally {
shutdownCluster();
@ -446,23 +472,23 @@ public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
try {
createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
// Adding ARCHIVE based datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
dfs);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
@ -489,22 +515,22 @@ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
try {
createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
// Adding DISK based datanodes
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// No block movement will be scheduled as there is no target node
// available with the required storage type.
waitForAttemptedItems(1, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 3, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
dfs);
// Since there is no target node the item will get timed out and then
// re-attempted.
waitForAttemptedItems(1, 30000);
@ -628,8 +654,8 @@ public void testMoveWithBlockPinning() throws Exception {
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
// Adding DISK based datanodes
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file1));
hdfsCluster.triggerHeartbeats();
@ -682,21 +708,21 @@ public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE}};
hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
writeContent(file, (short) 5);
writeContent(FILE, (short) 5);
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
dfs.setStoragePolicy(new Path(FILE), COLD);
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 3, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
dfs);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
@ -720,20 +746,19 @@ public void testBlockMoveInSameDatanodeWithONESSD() throws Exception {
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
try {
hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
storagesPerDatanode, capacity);
hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
writeContent(file);
writeContent(FILE);
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
dfs);
} finally {
shutdownCluster();
@ -760,19 +785,19 @@ public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception {
true);
try {
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
writeContent(file);
writeContent(FILE);
// Change policy to WARM
dfs.setStoragePolicy(new Path(file), "WARM");
dfs.satisfyStoragePolicy(new Path(file));
dfs.setStoragePolicy(new Path(FILE), "WARM");
dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000,
dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
dfs);
} finally {
shutdownCluster();
}
@ -794,31 +819,31 @@ public void testSPSWhenReplicaWithExpectedStorageAlreadyAvailableInSource()
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
// 1. Write two replica on disk
DFSTestUtil.createFile(dfs, new Path(file), DEFAULT_BLOCK_SIZE,
DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE,
(short) 2, 0);
// 2. Change policy to COLD, so third replica will be written to ARCHIVE.
dfs.setStoragePolicy(new Path(file), "COLD");
dfs.setStoragePolicy(new Path(FILE), "COLD");
// 3.Change replication factor to 3.
dfs.setReplication(new Path(file), (short) 3);
dfs.setReplication(new Path(FILE), (short) 3);
DFSTestUtil
.waitExpectedStorageType(file, StorageType.DISK, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
dfs);
// 4. Change policy to HOT, so we can move the all block to DISK.
dfs.setStoragePolicy(new Path(file), "HOT");
dfs.setStoragePolicy(new Path(FILE), "HOT");
// 4. Satisfy the policy.
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
// 5. Block should move successfully .
DFSTestUtil
.waitExpectedStorageType(file, StorageType.DISK, 3, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
dfs);
} finally {
shutdownCluster();
}
@ -840,13 +865,13 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
true);
long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
try {
hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
storagesPerDatanode, dnCapacity);
hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
STORAGES_PER_DATANODE, dnCapacity);
dfs = hdfsCluster.getFileSystem();
writeContent(file);
writeContent(FILE);
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
Path filePath = new Path("/testChooseInSameDatanode");
final FSDataOutputStream out =
dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
@ -869,7 +894,7 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
for (DataNode dataNode : dataNodes) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
}
dfs.satisfyStoragePolicy(new Path(file));
dfs.satisfyStoragePolicy(new Path(FILE));
// Wait for items to be processed
waitForAttemptedItems(1, 30000);
@ -887,9 +912,9 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
}
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000,
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
dfs);
DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, dfs);
} finally {
shutdownCluster();
}
@ -928,7 +953,7 @@ public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
true);
try {
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@ -1029,8 +1054,7 @@ public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK}};
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.storageTypes(newtypes).build();
cluster = startCluster(conf, newtypes, 3, 2, CAPACITY);
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/zeroSizeFile");
@ -1211,7 +1235,7 @@ public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
@ -1245,7 +1269,7 @@ public void testBatchProcessingForSPSDirectory() throws Exception {
config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
5);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
List<String> files = getDFSListOfTree();
@ -1284,7 +1308,7 @@ public void testTraverseWhenParentDeleted() throws Exception {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
@ -1312,8 +1336,7 @@ public boolean isRunning() {
}
};
FileIdCollector fileIDCollector =
new IntraSPSNameNodeFileIdCollector(fsDir, sps);
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null);
sps.getStorageMovementQueue().activate();
@ -1323,31 +1346,20 @@ public boolean isRunning() {
//Wait for thread to reach U.
Thread.sleep(1000);
dfs.delete(new Path("/root/D/L"), true);
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
long trackId = sps.getStorageMovementQueue().get().getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
//Wait to finish tree traverse
Thread.sleep(5000);
// Check other element traversed in order and R,S should not be added in
// queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
long trackId = sps.getStorageMovementQueue().get().getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
assertTraversal(expectedTraverseOrder, fsDir, sps);
dfs.delete(new Path("/root"), true);
}
public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
Context ctxt) {
FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector(
hdfsCluster.getNamesystem().getFSDirectory(), sps);
return fileIDCollector;
}
/**
* Test traverse when root parent got deleted.
* 1. Delete L when traversing Q
@ -1362,7 +1374,7 @@ public void testTraverseWhenRootParentDeleted() throws Exception {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
@ -1378,7 +1390,6 @@ public void testTraverseWhenRootParentDeleted() throws Exception {
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
// StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) {
@ -1392,9 +1403,7 @@ public boolean isRunning() {
return true;
}
};
FileIdCollector fileIDCollector =
new IntraSPSNameNodeFileIdCollector(fsDir, sps);
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null);
sps.getStorageMovementQueue().activate();
@ -1407,6 +1416,13 @@ public boolean isRunning() {
dfs.delete(new Path("/root/D/L"), true);
assertTraversal(expectedTraverseOrder, fsDir, sps);
dfs.delete(new Path("/root"), true);
}
private void assertTraversal(List<String> expectedTraverseOrder,
FSDirectory fsDir, StoragePolicySatisfier sps)
throws InterruptedException {
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
@ -1426,7 +1442,6 @@ public boolean isRunning() {
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
dfs.delete(new Path("/root"), true);
}
/**
@ -1473,8 +1488,8 @@ public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD}};
startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
// increase replication factor to 4 for the first 10 files and thus
// initiate replica tasks
@ -1772,7 +1787,7 @@ public Boolean get() {
}, 100, timeout);
}
private void writeContent(final String fileName) throws IOException {
public void writeContent(final String fileName) throws IOException {
writeContent(fileName, (short) 3);
}
@ -1805,7 +1820,7 @@ private void startAdditionalDNs(final Configuration conf,
cluster.triggerHeartbeats();
}
private MiniDFSCluster startCluster(final Configuration conf,
public MiniDFSCluster startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity) throws IOException {
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
import org.junit.Ignore;
/**
* Tests the external sps service plugins.
*/
public class TestExternalStoragePolicySatisfier
extends TestStoragePolicySatisfier {
private StorageType[][] allDiskTypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}};
@Override
public void createCluster() throws IOException {
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
getConf().setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
STORAGES_PER_DATANODE, CAPACITY));
getFS();
writeContent(FILE);
}
@Override
public MiniDFSCluster startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity) throws IOException {
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
for (int i = 0; i < numberOfDatanodes; i++) {
for (int j = 0; j < storagesPerDn; j++) {
capacities[i][j] = nodeCapacity;
}
}
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
.storageTypes(storageTypes).storageCapacities(capacities).build();
cluster.waitActive();
if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
false)) {
SPSService spsService = cluster.getNameNode().getNamesystem()
.getBlockManager().getSPSService();
spsService.stopGracefully();
IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
cluster.getNameNode().getNamesystem(),
cluster.getNameNode().getNamesystem().getBlockManager(), cluster
.getNameNode().getNamesystem().getBlockManager().getSPSService());
spsService.init(context,
new ExternalSPSFileIDCollector(context,
cluster.getNameNode().getNamesystem().getBlockManager()
.getSPSService(),
5),
new IntraSPSNameNodeBlockMoveTaskHandler(
cluster.getNameNode().getNamesystem().getBlockManager(),
cluster.getNameNode().getNamesystem()));
spsService.start(true);
}
return cluster;
}
@Override
public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
Context ctxt) {
return new ExternalSPSFileIDCollector(ctxt, sps, 5);
}
/**
* This test need not run as external scan is not a batch based scanning right
* now.
*/
@Ignore("ExternalFileIdCollector is not batch based right now."
+ " So, ignoring it.")
public void testBatchProcessingForSPSDirectory() throws Exception {
}
}