HDFS-11336: [SPS]: Remove xAttrs when movements done or SPS disabled. Contributed by Yuanbo Liu.
This commit is contained in:
parent
9b15f5418d
commit
c00be44463
@ -19,6 +19,7 @@
|
||||
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
@ -54,6 +55,7 @@ public class BlockStorageMovementAttemptedItems {
|
||||
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
|
||||
private volatile boolean monitorRunning = true;
|
||||
private Daemon timerThread = null;
|
||||
private final StoragePolicySatisfier sps;
|
||||
//
|
||||
// It might take anywhere between 30 to 60 minutes before
|
||||
// a request is timed out.
|
||||
@ -69,7 +71,8 @@ public class BlockStorageMovementAttemptedItems {
|
||||
|
||||
public BlockStorageMovementAttemptedItems(long recheckTimeout,
|
||||
long selfRetryTimeout,
|
||||
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
|
||||
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
|
||||
StoragePolicySatisfier sps) {
|
||||
if (recheckTimeout > 0) {
|
||||
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
|
||||
}
|
||||
@ -78,6 +81,7 @@ public BlockStorageMovementAttemptedItems(long recheckTimeout,
|
||||
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
|
||||
storageMovementAttemptedItems = new HashMap<>();
|
||||
storageMovementAttemptedResults = new ArrayList<>();
|
||||
this.sps = sps;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -200,6 +204,9 @@ public void run() {
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
|
||||
+ "is interrupted.", ie);
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
|
||||
+ "received exception and exiting.", ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -248,7 +255,7 @@ private boolean isExistInResult(Long blockCollectionID) {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void blockStorageMovementResultCheck() {
|
||||
void blockStorageMovementResultCheck() throws IOException {
|
||||
synchronized (storageMovementAttemptedResults) {
|
||||
Iterator<BlocksStorageMovementResult> resultsIter =
|
||||
storageMovementAttemptedResults.iterator();
|
||||
@ -296,6 +303,9 @@ void blockStorageMovementResultCheck() {
|
||||
+ " reported from co-ordinating datanode. But the trackID "
|
||||
+ "doesn't exists in storageMovementAttemptedItems list",
|
||||
storageMovementAttemptedResult.getTrackId());
|
||||
// Remove xattr for the track id.
|
||||
this.sps.notifyBlkStorageMovementFinished(
|
||||
storageMovementAttemptedResult.getTrackId());
|
||||
}
|
||||
}
|
||||
// Remove trackID from the attempted list, if any.
|
||||
|
@ -534,6 +534,14 @@ private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
|
||||
return false;
|
||||
}
|
||||
|
||||
static void unprotectedRemoveSPSXAttr(INode inode, XAttr spsXAttr)
|
||||
throws IOException{
|
||||
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
|
||||
existingXAttrs.remove(spsXAttr);
|
||||
XAttrStorage.updateINodeXAttrs(inode, existingXAttrs,
|
||||
INodesInPath.fromINode(inode).getLatestSnapshotId());
|
||||
}
|
||||
|
||||
private static void setDirStoragePolicy(
|
||||
FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException {
|
||||
INode inode = FSDirectory.resolveLastINode(iip);
|
||||
|
@ -1418,6 +1418,22 @@ private void addStoragePolicySatisfier(INodeWithAdditionalFields inode,
|
||||
getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the SPS xattr from the inode, retrieve the inode from the
|
||||
* block collection id.
|
||||
* @param id
|
||||
* - file block collection id.
|
||||
*/
|
||||
public void removeSPSXattr(long id) throws IOException {
|
||||
final INode inode = getInode(id);
|
||||
final XAttrFeature xaf = inode.getXAttrFeature();
|
||||
final XAttr spsXAttr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
|
||||
|
||||
if (spsXAttr != null) {
|
||||
FSDirAttrOp.unprotectedRemoveSPSXAttr(inode, spsXAttr);
|
||||
}
|
||||
}
|
||||
|
||||
private void addEncryptionZone(INodeWithAdditionalFields inode,
|
||||
XAttrFeature xaf) {
|
||||
if (xaf == null) {
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@ -91,7 +92,8 @@ public StoragePolicySatisfier(final Namesystem namesystem,
|
||||
conf.getLong(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
|
||||
storageMovementNeeded);
|
||||
storageMovementNeeded,
|
||||
this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -119,12 +121,6 @@ public synchronized void start(boolean reconfigStart) {
|
||||
*/
|
||||
public synchronized void stop(boolean reconfigStop) {
|
||||
isRunning = false;
|
||||
if (reconfigStop) {
|
||||
LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
|
||||
+ "deactivate it.");
|
||||
} else {
|
||||
LOG.info("Stopping StoragePolicySatisfier.");
|
||||
}
|
||||
if (storagePolicySatisfierThread == null) {
|
||||
return;
|
||||
}
|
||||
@ -135,8 +131,12 @@ public synchronized void stop(boolean reconfigStop) {
|
||||
}
|
||||
this.storageMovementsMonitor.stop();
|
||||
if (reconfigStop) {
|
||||
this.clearQueues();
|
||||
LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
|
||||
+ "deactivate it.");
|
||||
this.clearQueuesWithNotification();
|
||||
this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
|
||||
} else {
|
||||
LOG.info("Stopping StoragePolicySatisfier.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -717,4 +717,33 @@ public void clearQueues() {
|
||||
+ "user requests on satisfying block storages would be discarded.");
|
||||
storageMovementNeeded.clearAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean all the movements in storageMovementNeeded and notify
|
||||
* to clean up required resources.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void clearQueuesWithNotification() {
|
||||
Long id;
|
||||
while ((id = storageMovementNeeded.get()) != null) {
|
||||
try {
|
||||
notifyBlkStorageMovementFinished(id);
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed to remove SPS "
|
||||
+ "xattr for collection id " + id, ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When block movement has been finished successfully, some additional
|
||||
* operations should be notified, for example, SPS xattr should be
|
||||
* removed.
|
||||
* @param trackId track id i.e., block collection id.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void notifyBlkStorageMovementFinished(long trackId)
|
||||
throws IOException {
|
||||
this.namesystem.getFSDirectory().removeSPSXattr(trackId);
|
||||
}
|
||||
}
|
||||
|
@ -2454,6 +2454,6 @@ public Boolean get() {
|
||||
+ expectedStorageCount + " and actual=" + actualStorageCount);
|
||||
return expectedStorageCount == actualStorageCount;
|
||||
}
|
||||
}, 1000, timeout);
|
||||
}, 500, timeout);
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Tests that block storage movement attempt failures are reported from DN and
|
||||
@ -36,10 +37,11 @@ public class TestBlockStorageMovementAttemptedItems {
|
||||
private final int selfRetryTimeout = 500;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
public void setup() throws Exception {
|
||||
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
|
||||
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
|
||||
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
|
||||
selfRetryTimeout, unsatisfiedStorageMovementFiles);
|
||||
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -20,16 +20,22 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
* Test persistence of satisfying files/directories.
|
||||
@ -72,7 +78,16 @@ public class TestPersistentStoragePolicySatisfier {
|
||||
* @throws IOException
|
||||
*/
|
||||
public void clusterSetUp() throws Exception {
|
||||
clusterSetUp(false);
|
||||
clusterSetUp(false, new HdfsConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup environment for every test case.
|
||||
* @param hdfsConf hdfs conf.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void clusterSetUp(Configuration hdfsConf) throws Exception {
|
||||
clusterSetUp(false, hdfsConf);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -80,8 +95,9 @@ public void clusterSetUp() throws Exception {
|
||||
* @param isHAEnabled if true, enable simple HA.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void clusterSetUp(boolean isHAEnabled) throws Exception {
|
||||
conf = new HdfsConfiguration();
|
||||
private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
|
||||
throws Exception {
|
||||
conf = newConf;
|
||||
final int dnNumber = storageTypes.length;
|
||||
final short replication = 3;
|
||||
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
|
||||
@ -188,7 +204,7 @@ public void testWithCheckpoint() throws Exception {
|
||||
public void testWithHA() throws Exception {
|
||||
try {
|
||||
// Enable HA env for testing.
|
||||
clusterSetUp(true);
|
||||
clusterSetUp(true, new HdfsConfiguration());
|
||||
|
||||
fs.setStoragePolicy(testFile, ALL_SSD);
|
||||
fs.satisfyStoragePolicy(testFile);
|
||||
@ -297,6 +313,94 @@ public void testWithFederationHA() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests to verify SPS xattr will be removed if the satisfy work has
|
||||
* been finished, expect that the method satisfyStoragePolicy can be
|
||||
* invoked on the same file again after the block movement has been
|
||||
* finished:
|
||||
* 1. satisfy storage policy of file1.
|
||||
* 2. wait until storage policy is satisfied.
|
||||
* 3. satisfy storage policy of file1 again
|
||||
* 4. make sure step 3 works as expected.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testMultipleSatisfyStoragePolicy() throws Exception {
|
||||
try {
|
||||
// Lower block movement check for testing.
|
||||
conf = new HdfsConfiguration();
|
||||
final long minCheckTimeout = 500; // minimum value
|
||||
conf.setLong(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||
minCheckTimeout);
|
||||
clusterSetUp(conf);
|
||||
fs.setStoragePolicy(testFile, ONE_SSD);
|
||||
fs.satisfyStoragePolicy(testFile);
|
||||
DFSTestUtil.waitExpectedStorageType(
|
||||
testFileName, StorageType.SSD, 1, timeout, fs);
|
||||
DFSTestUtil.waitExpectedStorageType(
|
||||
testFileName, StorageType.DISK, 2, timeout, fs);
|
||||
|
||||
// Make sure that SPS xattr has been removed.
|
||||
int retryTime = 0;
|
||||
while (retryTime < 30) {
|
||||
if (!fileContainsSPSXAttr(testFile)) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(minCheckTimeout);
|
||||
retryTime += 1;
|
||||
}
|
||||
|
||||
fs.setStoragePolicy(testFile, COLD);
|
||||
fs.satisfyStoragePolicy(testFile);
|
||||
DFSTestUtil.waitExpectedStorageType(
|
||||
testFileName, StorageType.ARCHIVE, 3, timeout, fs);
|
||||
} finally {
|
||||
clusterShutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests to verify SPS xattr is removed after SPS is dropped,
|
||||
* expect that if the SPS is disabled/dropped, the SPS
|
||||
* xattr should be removed accordingly:
|
||||
* 1. satisfy storage policy of file1.
|
||||
* 2. drop SPS thread in block manager.
|
||||
* 3. make sure sps xattr is removed.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testDropSPS() throws Exception {
|
||||
try {
|
||||
clusterSetUp();
|
||||
fs.setStoragePolicy(testFile, ONE_SSD);
|
||||
fs.satisfyStoragePolicy(testFile);
|
||||
|
||||
cluster.getNamesystem().getBlockManager().deactivateSPS();
|
||||
|
||||
// Make sure satisfy xattr has been removed.
|
||||
assertFalse(fileContainsSPSXAttr(testFile));
|
||||
|
||||
} finally {
|
||||
clusterShutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether file contains SPS xattr.
|
||||
* @param fileName file name.
|
||||
* @return true if file contains SPS xattr.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean fileContainsSPSXAttr(Path fileName) throws IOException {
|
||||
final INode inode = cluster.getNamesystem()
|
||||
.getFSDirectory().getINode(fileName.toString());
|
||||
final XAttr satisfyXAttr =
|
||||
XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
|
||||
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
|
||||
return existingXAttrs.contains(satisfyXAttr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the hole env and trigger the DataNode's heart beats.
|
||||
* @throws Exception
|
||||
|
Loading…
Reference in New Issue
Block a user