HDFS-11123. [SPS] Make storage policy satisfier daemon work on/off dynamically. Contributed by Uma Maheswara Rao G

This commit is contained in:
Rakesh Radhakrishnan 2016-12-14 17:49:44 +05:30 committed by Uma Maheswara Rao Gangumalla
parent cd5262aba0
commit 5179d99b7e
11 changed files with 265 additions and 35 deletions

View File

@ -5026,7 +5026,52 @@ public void satisfyStoragePolicy(long id) {
}
}
/**
* Gets the storage policy satisfier instance.
*
* @return sps
*/
public StoragePolicySatisfier getStoragePolicySatisfier() {
return sps;
}
/**
* Activate the storage policy satisfier by starting its service.
*/
public void activateSPS() {
if (sps == null) {
LOG.info("Storage policy satisfier is not initialized.");
return;
} else if (sps.isRunning()) {
LOG.info("Storage policy satisfier is already running.");
return;
}
sps.start();
}
/**
* Deactivate the storage policy satisfier by stopping its services.
*/
public void deactivateSPS() {
if (sps == null) {
LOG.info("Storage policy satisfier is not initialized.");
return;
} else if (!sps.isRunning()) {
LOG.info("Storage policy satisfier is already stopped.");
return;
}
sps.stop();
// TODO: add command to DNs for stop in-progress processing SPS commands?
// to avoid confusions in cluster, I think sending commands from centralized
// place would be better to drop pending queues at DN. Anyway in progress
// work will be finished in a while, but this command can void starting
// fresh movements at DN.
}
/**
* @return True if storage policy satisfier running.
*/
public boolean isStoragePolicySatisfierRunning() {
return sps == null ? false : sps.isRunning();
}
}

View File

@ -49,7 +49,7 @@ public class BlockStorageMovementAttemptedItems {
// processing and sent to DNs.
private final Map<Long, Long> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean spsRunning = true;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
//
// It might take anywhere between 30 to 60 minutes before
@ -109,7 +109,8 @@ public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
/**
* Starts the monitor thread.
*/
void start() {
public synchronized void start() {
monitorRunning = true;
timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
timerThread.start();
@ -118,8 +119,14 @@ void start() {
/**
* Stops the monitor thread.
*/
public void stop() {
spsRunning = false;
public synchronized void stop() {
monitorRunning = false;
timerThread.interrupt();
try {
timerThread.join(3000);
} catch (InterruptedException ie) {
}
this.clearQueues();
}
/**
@ -129,13 +136,13 @@ public void stop() {
private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
@Override
public void run() {
while (spsRunning) {
while (monitorRunning) {
try {
blockStorageMovementResultCheck();
blocksStorageMovementUnReportedItemsCheck();
Thread.sleep(checkTimeout);
} catch (InterruptedException ie) {
LOG.debug("BlocksStorageMovementAttemptResultMonitor thread "
LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
+ "is interrupted.", ie);
}
}
@ -222,4 +229,9 @@ public int resultsCount() {
public int getAttemptedItemsCount() {
return storageMovementAttemptedItems.size();
}
public void clearQueues() {
storageMovementAttemptedResults.clear();
storageMovementAttemptedItems.clear();
}
}

View File

@ -50,4 +50,8 @@ public synchronized void add(Long blockCollectionID) {
public synchronized Long get() {
return storageMovementNeeded.poll();
}
public synchronized void clearAll() {
storageMovementNeeded.clear();
}
}

View File

@ -193,14 +193,6 @@ static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
String src) throws IOException {
// make sure storage policy is enabled, otherwise
// there is no need to satisfy storage policy.
if (!fsd.isStoragePolicyEnabled()) {
throw new IOException(String.format(
"Failed to satisfy storage policy since %s is set to false.",
DFS_STORAGE_POLICY_ENABLED_KEY));
}
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();

View File

@ -89,7 +89,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@ -2237,6 +2239,22 @@ void satisfyStoragePolicy(String src) throws IOException {
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot satisfy storage policy for " + src);
// make sure storage policy is enabled, otherwise
// there is no need to satisfy storage policy.
if (!dir.isStoragePolicyEnabled()) {
throw new IOException(String.format(
"Failed to satisfy storage policy since %s is set to false.",
DFS_STORAGE_POLICY_ENABLED_KEY));
}
if (blockManager.getStoragePolicySatisfier() == null
|| !blockManager.getStoragePolicySatisfier().isRunning()) {
throw new UnsupportedActionException(
"Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been deactivated"
+ " by admin. Seek for an admin help to activate it "
+ "or use Mover tool.");
}
// TODO: need to update editlog for persistence.
FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src);
} finally {
@ -3895,11 +3913,18 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
// TODO: Handle blocks movement results send by the coordinator datanode.
// This has to be revisited as part of HDFS-11029.
if (blockManager.getStoragePolicySatisfier() != null) {
blockManager.getStoragePolicySatisfier()
.handleBlocksStorageMovementResults(blksMovementResults);
// Handle blocks movement results sent by the coordinator datanode.
StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
if (sps != null) {
if (!sps.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Storage policy satisfier is not running. So, ignoring block "
+ "storage movement results sent by co-ordinator datanode");
}
} else {
sps.handleBlocksStorageMovementResults(blksMovementResults);
}
}
//create ha status

View File

@ -160,6 +160,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
@ -293,7 +294,8 @@ public enum OperationCategory {
DFS_HEARTBEAT_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
FS_PROTECTED_DIRECTORIES,
HADOOP_CALLER_CONTEXT_ENABLED_KEY));
HADOOP_CALLER_CONTEXT_ENABLED_KEY,
DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@ -2039,6 +2041,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
return reconfCallerContextEnabled(newVal);
} else if (property.equals(ipcClientRPCBackoffEnable)) {
return reconfigureIPCBackoffEnabled(newVal);
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY)) {
return reconfigureSPSActivate(newVal, property);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
@ -2122,6 +2126,26 @@ String reconfigureIPCBackoffEnabled(String newVal) {
return Boolean.toString(clientBackoffEnabled);
}
String reconfigureSPSActivate(String newVal, String property)
throws ReconfigurationException {
if (newVal == null || !(newVal.equalsIgnoreCase(Boolean.TRUE.toString())
|| newVal.equalsIgnoreCase(Boolean.FALSE.toString()))) {
throw new ReconfigurationException(property, newVal,
getConf().get(property),
new HadoopIllegalArgumentException(
"For activating or deactivating storage policy satisfier, "
+ "we must pass true/false only"));
}
boolean activateSPS = Boolean.parseBoolean(newVal);
if (activateSPS) {
namesystem.getBlockManager().activateSPS();
} else {
namesystem.getBlockManager().deactivateSPS();
}
return newVal;
}
@Override // ReconfigurableBase
protected Configuration getNewConf() {
return new HdfsConfiguration();

View File

@ -2528,8 +2528,6 @@ public boolean isStoragePolicySatisfierRunning() throws IOException {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
StoragePolicySatisfier sps = namesystem.getBlockManager()
.getStoragePolicySatisfier();
return sps != null && sps.isRunning();
return namesystem.getBlockManager().isStoragePolicySatisfierRunning();
}
}

View File

@ -91,7 +91,9 @@ public StoragePolicySatisfier(final Namesystem namesystem,
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
*/
public void start() {
public synchronized void start() {
isRunning = true;
LOG.info("Starting StoragePolicySatisfier.");
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@ -101,8 +103,9 @@ public void start() {
/**
* Stop storage policy satisfier demon thread.
*/
public void stop() {
public synchronized void stop() {
isRunning = false;
LOG.info("Stopping StoragePolicySatisfier.");
if (storagePolicySatisfierThread == null) {
return;
}
@ -112,6 +115,7 @@ public void stop() {
} catch (InterruptedException ie) {
}
this.storageMovementsMonitor.stop();
this.clearQueues();
}
/**
@ -141,14 +145,20 @@ private boolean checkIfMoverRunning() {
@Override
public void run() {
isRunning = !checkIfMoverRunning();
if (!isRunning) {
LOG.error("StoragePolicySatisfier thread stopped "
+ "as Mover ID file " + HdfsServerConstants.MOVER_ID_PATH.toString()
+ " exists");
return;
boolean isMoverRunning = !checkIfMoverRunning();
synchronized (this) {
isRunning = isMoverRunning;
if (!isRunning) {
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stop();
LOG.error(
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
+ HdfsServerConstants.MOVER_ID_PATH.toString() + " exists");
return;
}
}
while (namesystem.isRunning()) {
while (namesystem.isRunning() && isRunning) {
try {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
@ -159,7 +169,12 @@ public void run() {
// we want to check block movements.
Thread.sleep(3000);
} catch (Throwable t) {
isRunning = false;
synchronized (this) {
isRunning = false;
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stop();
}
if (!namesystem.isRunning()) {
LOG.info("Stopping StoragePolicySatisfier.");
if (!(t instanceof InterruptedException)) {
@ -488,4 +503,14 @@ void handleBlocksStorageMovementResults(
BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
/**
* Clear the queues from to be storage movement needed lists and items tracked
* in storage movement monitor.
*/
public void clearQueues() {
LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, "
+ "user requests on satisfying block storages would be discarded.");
storageMovementNeeded.clearAll();
}
}

View File

@ -4499,8 +4499,13 @@
<name>dfs.storage.policy.satisfier.activate</name>
<value>true</value>
<description>
If true, activate StoragePolicySatisfier.
If true, StoragePolicySatisfier will be started along with active namenode.
By default, StoragePolicySatisfier is activated.
Administrator can dynamically activate or deactivate StoragePolicySatisfier by using reconfiguration option.
Dynamic activation/deactivation option can be achieved in the following way.
1. Edit/update this configuration property values in hdfs-site.xml
2. Execute the reconfig command on hadoop command line prompt.
For example:$hdfs -reconfig namenode nn_host:port start
</description>
</property>

View File

@ -30,9 +30,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
@ -40,6 +44,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
@ -216,6 +222,100 @@ public void testReconfigureHearbeatCheck() throws ReconfigurationException {
datanodeManager.getHeartbeatRecheckInterval());
}
/**
* Tests activate/deactivate Storage Policy Satisfier dynamically.
*/
@Test(timeout = 30000)
public void testReconfigureStoragePolicySatisfierActivated()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
verifySPSActivated(nameNode, DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
true);
// try invalid values
try {
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
"text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException e) {
GenericTestUtils.assertExceptionContains(
"For activating or deactivating storage policy satisfier, "
+ "we must pass true/false only",
e.getCause());
}
// enable SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
"true");
verifySPSActivated(nameNode, DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
true);
// disable SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
"false");
verifySPSActivated(nameNode, DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
false);
// revert to default
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
"true");
assertEquals(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY + " has wrong value",
true, nameNode.getNamesystem().getBlockManager()
.isStoragePolicySatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY + " has wrong value",
true, nameNode.getConf()
.getBoolean(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false));
}
/**
* Test to satisfy storage policy after deactivating storage policy satisfier.
*/
@Test(timeout = 30000)
public void testSatisfyStoragePolicyAfterSatisfierDeactivated()
throws ReconfigurationException, IOException {
final NameNode nameNode = cluster.getNameNode();
// deactivate SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
"false");
verifySPSActivated(nameNode, DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
false);
Path filePath = new Path("/testSPS");
DistributedFileSystem fileSystem = cluster.getFileSystem();
fileSystem.create(filePath);
fileSystem.setStoragePolicy(filePath, "COLD");
try {
fileSystem.satisfyStoragePolicy(filePath);
fail("Expected to fail, as storage policy feature has deactivated.");
} catch (RemoteException e) {
GenericTestUtils
.assertExceptionContains("Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been deactivated"
+ " by admin. Seek for an admin help to activate it "
+ "or use Mover tool.", e);
}
// revert to default
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
"true");
assertEquals(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY + " has wrong value",
true, nameNode.getNamesystem().getBlockManager()
.isStoragePolicySatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY + " has wrong value",
true, nameNode.getConf()
.getBoolean(DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false));
}
void verifySPSActivated(final NameNode nameNode, String property,
boolean expected) {
assertEquals(property + " has wrong value", expected, nameNode
.getNamesystem().getBlockManager().isStoragePolicySatisfierRunning());
assertEquals(property + " has wrong value", expected, nameNode.getConf()
.getBoolean(property, DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_DEFAULT));
}
@Test
public void testBlockInvalidateLimitAfterReconfigured()
throws ReconfigurationException {

View File

@ -394,7 +394,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(6, outs.size());
assertEquals(7, outs.size());
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
assertEquals(errs.size(), 0);