HDFS-13050: [SPS]: Create start/stop script to start external SPS process. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Rakesh Radhakrishnan 2018-01-29 03:10:48 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 99594b48b8
commit 5845c36c16
15 changed files with 259 additions and 128 deletions

View File

@ -63,6 +63,7 @@ function hadoop_usage
hadoop_add_subcommand "secondarynamenode" daemon "run the DFS secondary namenode"
hadoop_add_subcommand "snapshotDiff" client "diff two snapshots of a directory or diff the current directory contents with a snapshot"
hadoop_add_subcommand "storagepolicies" admin "list/get/set/satisfyStoragePolicy block storage policies"
hadoop_add_subcommand "sps" daemon "run external storagepolicysatisfier"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "zkfc" daemon "run the ZK Failover Controller daemon"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" false
@ -201,6 +202,10 @@ function hdfscmd_case
storagepolicies)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.StoragePolicyAdmin
;;
sps)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier
;;
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;

View File

@ -94,6 +94,9 @@
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
@ -5106,9 +5109,15 @@ public void enableInternalSPS() {
return;
}
updateSPSMode(StoragePolicySatisfierMode.INTERNAL);
sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps),
new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
sps),
new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null);
sps.start(true, spsMode);
}
/**
* Enable storage policy satisfier by starting its service.
*/

View File

@ -672,7 +672,7 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
}
if (spsRunning) {
System.err.println("Mover failed due to StoragePolicySatisfier"
+ " is running. Exiting with status "
+ " service running inside namenode. Exiting with status "
+ ExitStatus.SKIPPED_DUE_TO_SPS + "... ");
return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
}

View File

@ -175,9 +175,4 @@ boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
*/
String getFilePath(Long inodeId);
/**
* Close the resources.
*/
void close() throws IOException;
}

View File

@ -196,8 +196,4 @@ public String getFilePath(Long inodeId) {
return namesystem.getFilePath(inodeId);
}
@Override
public void close() throws IOException {
// Nothing to clean.
}
}

View File

@ -158,11 +158,15 @@ public void scanAndCollectFileIds(final Long startINodeId)
*/
public synchronized int remainingCapacity() {
int size = service.processingQueueSize();
if (size >= maxQueueLimitToScan) {
return 0;
} else {
return (maxQueueLimitToScan - size);
int remainingSize = 0;
if (size < maxQueueLimitToScan) {
remainingSize = maxQueueLimitToScan - size;
}
if (LOG.isDebugEnabled()) {
LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+ " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
}
return remainingSize;
}
class SPSTraverseInfo extends TraverseInfo {

View File

@ -31,6 +31,7 @@
public class SPSPathIds {
// List of pending dir to satisfy the policy
// TODO: Make this bounded queue.
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
/**

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -174,10 +175,11 @@ public synchronized void start(boolean reconfigStart,
return;
}
if (reconfigStart) {
LOG.info("Starting StoragePolicySatisfier, as admin requested to "
+ "start it.");
LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
+ "start it.", StringUtils.toLowerCase(spsMode.toString()));
} else {
LOG.info("Starting StoragePolicySatisfier.");
LOG.info("Starting {} StoragePolicySatisfier.",
StringUtils.toLowerCase(spsMode.toString()));
}
// Ensure that all the previously submitted block movements(if any) have to
@ -243,7 +245,14 @@ private void addDropSPSWorkCommandsToAllDNs() {
@Override
public void run() {
while (ctxt.isRunning()) {
while (isRunning) {
// Check if dependent service is running
if (!ctxt.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Upstream service is down, skipping the sps work.");
}
continue;
}
try {
if (!ctxt.isInSafeMode()) {
ItemInfo itemInfo = storageMovementNeeded.get();
@ -284,33 +293,39 @@ public void run() {
// Just add to monitor, so it will be tracked for report and
// be removed on storage movement attempt finished report.
case BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
LOG.debug("Block analysis status:{} for the file path:{}."
+ " Adding to attempt monitor queue for the storage "
+ "movement attempt finished report",
status.status, fileStatus.getPath());
}
this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
.getStartId(), itemInfo.getFileId(), monotonicNow(),
status.assignedBlocks, itemInfo.getRetryCount()));
break;
case NO_BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID " + trackId
+ " back to retry queue as none of the blocks"
+ " found its eligible targets.");
LOG.debug("Adding trackID:{} for the file path:{} back to"
+ " retry queue as none of the blocks found its eligible"
+ " targets.", trackId, fileStatus.getPath());
}
itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID " + trackId
+ " back to retry queue as some of the blocks"
+ " are low redundant.");
LOG.debug("Adding trackID:{} for the file path:{} back to "
+ "retry queue as some of the blocks are low redundant.",
trackId, fileStatus.getPath());
}
itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID " + trackId
+ " back to retry queue as some of the blocks"
+ " movement failed.");
LOG.debug("Adding trackID:{} for the file path:{} back to "
+ "retry queue as some of the blocks movement failed.",
trackId, fileStatus.getPath());
}
this.storageMovementNeeded.add(itemInfo);
break;
@ -318,8 +333,9 @@ public void run() {
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
default:
LOG.info("Block analysis skipped or blocks already satisfied"
+ " with storages. So, Cleaning up the Xattrs.");
LOG.info("Block analysis status:{} for the file path:{}."
+ " So, Cleaning up the Xattrs.", status.status,
fileStatus.getPath());
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
break;
}
@ -346,20 +362,20 @@ private void handleException(Throwable t) {
if (isRunning) {
synchronized (this) {
if (isRunning) {
isRunning = false;
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
if (!(t instanceof InterruptedException)) {
LOG.info("StoragePolicySatisfier received an exception"
+ " while shutting down.", t);
if (t instanceof InterruptedException) {
isRunning = false;
LOG.info("Stopping StoragePolicySatisfier.");
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
} else {
LOG.error(
"StoragePolicySatisfier thread received runtime exception, "
+ "ignoring", t);
}
LOG.info("Stopping StoragePolicySatisfier.");
}
}
}
LOG.error("StoragePolicySatisfier thread received runtime exception. "
+ "Stopping Storage policy satisfier work", t);
return;
}
@ -374,9 +390,8 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
if (!lastBlkComplete) {
// Postpone, currently file is under construction
// So, should we add back? or leave it to user
LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ " this to the next retry iteration", fileInfo.getFileId());
LOG.info("File: {} is under construction. So, postpone"
+ " this to the next retry iteration", fileInfo.getPath());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
new ArrayList<>());
@ -384,8 +399,8 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
if (blocks.size() == 0) {
LOG.info("BlockCollectionID: {} file is not having any blocks."
+ " So, skipping the analysis.", fileInfo.getFileId());
LOG.info("File: {} is not having any blocks."
+ " So, skipping the analysis.", fileInfo.getPath());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new ArrayList<>());
@ -970,4 +985,12 @@ public BlockStorageMovementNeeded getStorageMovementQueue() {
public void markScanCompletedForPath(Long inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId);
}
/**
* Join main SPS thread.
*/
public void join() throws InterruptedException {
//TODO Add join here on SPS rpc server also
storagePolicySatisfierThread.join();
}
}

View File

@ -110,7 +110,7 @@ public ExternalSPSBlockMoveTaskHandler(Configuration conf,
/**
* Initializes block movement tracker daemon and starts the thread.
*/
void init() {
public void init() {
movementTrackerThread = new Daemon(this.blkMovementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
movementTrackerThread.start();

View File

@ -19,19 +19,13 @@
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@ -57,13 +51,12 @@ public class ExternalSPSContext implements Context {
LoggerFactory.getLogger(ExternalSPSContext.class);
private SPSService service;
private NameNodeConnector nnc = null;
private Object nnConnectionLock = new Object();
private BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite();
public ExternalSPSContext(SPSService service) {
public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
this.service = service;
initializeNamenodeConnector();
this.nnc = nnc;
}
@Override
@ -73,7 +66,6 @@ public boolean isRunning() {
@Override
public boolean isInSafeMode() {
initializeNamenodeConnector();
try {
return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode()
: false;
@ -85,7 +77,6 @@ public boolean isInSafeMode() {
@Override
public boolean isMoverRunning() {
initializeNamenodeConnector();
try {
FSDataOutputStream out = nnc.getDistributedFileSystem()
.append(HdfsServerConstants.MOVER_ID_PATH);
@ -101,7 +92,6 @@ public boolean isMoverRunning() {
@Override
public long getFileID(String path) throws UnresolvedLinkException,
AccessControlException, ParentNotDirectoryException {
initializeNamenodeConnector();
HdfsFileStatus fs = null;
try {
fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus(
@ -121,7 +111,6 @@ public NetworkTopology getNetworkTopology() {
@Override
public boolean isFileExist(long inodeId) {
initializeNamenodeConnector();
String filePath = null;
try {
filePath = getFilePath(inodeId);
@ -145,14 +134,12 @@ public void addDropPreviousSPSWorkAtDNs() {
@Override
public void removeSPSHint(long inodeId) throws IOException {
initializeNamenodeConnector();
nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
}
@Override
public int getNumLiveDataNodes() {
initializeNamenodeConnector();
try {
return nnc.getDistributedFileSystem()
.getDataNodeStats(DatanodeReportType.LIVE).length;
@ -164,7 +151,6 @@ public int getNumLiveDataNodes() {
@Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
initializeNamenodeConnector();
return nnc.getDistributedFileSystem().getClient()
.getLocatedFileInfo(getFilePath(inodeID), false);
}
@ -172,13 +158,11 @@ public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
@Override
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
initializeNamenodeConnector();
return nnc.getLiveDatanodeStorageReport();
}
@Override
public boolean hasLowRedundancyBlocks(long inodeID) {
initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
} catch (IOException e) {
@ -191,7 +175,6 @@ public boolean hasLowRedundancyBlocks(long inodeID) {
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) {
initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
estimatedSize);
@ -204,7 +187,6 @@ public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
@Override
public Long getNextSPSPathId() {
initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().getNextSPSPathId();
} catch (IOException e) {
@ -233,39 +215,4 @@ public String getFilePath(Long inodeId) {
return null;
}
}
@Override
public void close() throws IOException {
synchronized (nnConnectionLock) {
if (nnc != null) {
nnc.close();
}
}
}
private void initializeNamenodeConnector() {
synchronized (nnConnectionLock) {
if (nnc == null) {
try {
nnc = getNameNodeConnector(service.getConf());
} catch (IOException e) {
LOG.warn("Exception while creating Namenode Connector.."
+ "Namenode might not have started.", e);
}
}
}
}
public static NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
List<NameNodeConnector> nncs = Collections.emptyList();
NameNodeConnector.checkOtherInstanceRunning(false);
nncs = NameNodeConnector.newNameNodeConnectors(namenodes,
ExternalSPSContext.class.getSimpleName(),
HdfsServerConstants.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
}
}

View File

@ -139,11 +139,15 @@ private void checkProcessingQueuesFree() {
*/
public int remainingCapacity() {
int size = service.processingQueueSize();
if (size >= maxQueueLimitToScan) {
return 0;
} else {
return (maxQueueLimitToScan - size);
int remainingSize = 0;
if (size < maxQueueLimitToScan) {
remainingSize = maxQueueLimitToScan - size;
}
if (LOG.isDebugEnabled()) {
LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+ " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
}
return remainingSize;
}
@Override

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.sps;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class starts and runs external SPS service.
*/
@InterfaceAudience.Private
public class ExternalStoragePolicySatisfier {
public static final Logger LOG = LoggerFactory
.getLogger(ExternalStoragePolicySatisfier.class);
/**
* Main method to start SPS service.
*/
public static void main(String args[]) throws Exception {
NameNodeConnector nnc = null;
try {
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
LOG);
HdfsConfiguration spsConf = new HdfsConfiguration();
//TODO : login with SPS keytab
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
nnc = getNameNodeConnector(spsConf);
boolean spsRunning;
spsRunning = nnc.getDistributedFileSystem().getClient()
.isStoragePolicySatisfierRunning();
if (spsRunning) {
throw new RuntimeException(
"Startup failed due to StoragePolicySatisfier"
+ " running inside Namenode.");
}
ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
externalHandler.init();
sps.init(context, new ExternalSPSFileIDCollector(context, sps),
externalHandler, blkMoveListener);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
if (sps != null) {
sps.join();
}
} catch (Throwable e) {
LOG.error("Failed to start storage policy satisfier.", e);
terminate(1, e);
} finally {
if (nnc != null) {
nnc.close();
}
}
}
private static NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException, InterruptedException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
while (true) {
try {
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(namenodes,
StoragePolicySatisfier.class.getSimpleName(),
externalSPSPathId, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
} catch (IOException e) {
LOG.warn("Failed to connect with namenode", e);
Thread.sleep(3000); // retry the connection after few secs
}
}
}
/**
* It is implementation of BlockMovementListener.
*/
private static class ExternalBlockMovementListener
implements BlockMovementListener {
private List<Block> actualBlockMovements = new ArrayList<>();
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
for (Block block : moveAttemptFinishedBlks) {
actualBlockMovements.add(block);
}
LOG.info("Movement attempted blocks", actualBlockMovements);
}
}
}

View File

@ -238,5 +238,13 @@ Check the running status of Storage Policy Satisfier service in namenode. If it
### Enable(internal service inside NN or external service outside NN) or Disable SPS without restarting Namenode
If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(internal or external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
+ hdfs dfsadmin -reconfig namenode <host:ipc_port> start
* Command:
hdfs dfsadmin -reconfig namenode <host:ipc_port> start
### Start External SPS Service.
If administrator wants to start external sps, first he/she needs to configure property `dfs.storage.policy.satisfier.mode` with `external` value in configuration file (`hdfs-site.xml`) and then run Namenode reconfig command. After this start external sps service using following command
* Command:
hdfs --daemon start sps

View File

@ -603,7 +603,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
if (out != null) {
out.close();
}
hdfsCluster.shutdown();
shutdownCluster();
}
}
@ -626,9 +626,7 @@ public void testWhenMoverExitsWithoutDeleteMoverIDFile()
Assert.assertTrue("SPS should be running as "
+ "no Mover really running", running);
} finally {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
shutdownCluster();
}
}
@ -672,9 +670,7 @@ public void testMoveWithBlockPinning() throws Exception {
DFSTestUtil.waitExpectedStorageType(
file1, StorageType.DISK, 2, 30000, dfs);
} finally {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
shutdownCluster();
}
}
@ -1381,7 +1377,11 @@ private void assertTraversal(List<String> expectedTraverseOrder,
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
long trackId = sps.getStorageMovementQueue().get().getFileId();
ItemInfo itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) {
continue;
}
long trackId = itemInfo.getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@ -1392,7 +1392,11 @@ private void assertTraversal(List<String> expectedTraverseOrder,
// Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
long trackId = sps.getStorageMovementQueue().get().getFileId();
ItemInfo itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) {
continue;
}
long trackId = itemInfo.getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));

View File

@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -43,8 +42,6 @@
import org.junit.Assert;
import org.junit.Ignore;
import com.google.common.collect.Maps;
/**
* Tests the external sps service plugins.
*/
@ -95,7 +92,8 @@ public MiniDFSCluster startCluster(final Configuration conf,
SPSService spsService = blkMgr.getSPSService();
spsService.stopGracefully();
ExternalSPSContext context = new ExternalSPSContext(spsService);
ExternalSPSContext context = new ExternalSPSContext(spsService,
getNameNodeConnector(conf));
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
@ -124,7 +122,8 @@ public void restartNamenode() throws IOException{
spsService = blkMgr.getSPSService();
spsService.stopGracefully();
ExternalSPSContext context = new ExternalSPSContext(spsService);
ExternalSPSContext context = new ExternalSPSContext(spsService,
getNameNodeConnector(getConf()));
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
@ -161,16 +160,22 @@ private NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
final Path externalSPSPathId = new Path("/system/tmp.id");
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(nnMap,
StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
NameNodeConnector.checkOtherInstanceRunning(false);
while (true) {
try {
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(namenodes,
StoragePolicySatisfier.class.getSimpleName(),
externalSPSPathId, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
} catch (IOException e) {
LOG.warn("Failed to connect with namenode", e);
// Ignore
}
}
}
/**