HDFS-16477. [SPS]: Add metric PendingSPSPaths for getting the number of paths to be processed by SPS (#4009). Contributed by tomscut.

Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
litao 2022-04-03 04:06:03 +08:00 committed by GitHub
parent 4b1a6bfb10
commit 34b3275bf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 137 additions and 25 deletions

View File

@ -299,6 +299,7 @@ Each metrics record contains tags such as HAState and Hostname as additional inf
| `FSN(Read/Write)Lock`*OperationName*`NanosAvgTime` | Average time of holding the lock by operations in nanoseconds |
| `FSN(Read/Write)LockOverallNanosNumOps` | Total number of acquiring lock by all operations |
| `FSN(Read/Write)LockOverallNanosAvgTime` | Average time of holding the lock by all operations in nanoseconds |
| `PendingSPSPaths` | The number of paths to be processed by storage policy satisfier |
JournalNode
-----------

View File

@ -343,4 +343,11 @@ public interface FederationMBean {
* with the highest risk of loss.
*/
long getHighestPriorityLowRedundancyECBlocks();
/**
* Returns the number of paths to be processed by storage policy satisfier.
*
* @return The number of paths to be processed by sps.
*/
int getPendingSPSPaths();
}

View File

@ -874,6 +874,16 @@ public long getCurrentTokensCount() {
return 0;
}
@Override
public int getPendingSPSPaths() {
try {
return getRBFMetrics().getPendingSPSPaths();
} catch (IOException e) {
LOG.debug("Failed to get number of paths to be processed by sps", e);
}
return 0;
}
private Router getRouter() throws IOException {
if (this.router == null) {
throw new IOException("Router is not initialized");

View File

@ -746,6 +746,12 @@ public long getHighestPriorityLowRedundancyECBlocks() {
MembershipStats::getHighestPriorityLowRedundancyECBlocks);
}
@Override
public int getPendingSPSPaths() {
return getNameserviceAggregatedInt(
MembershipStats::getPendingSPSPaths);
}
@Override
@Metric({"RouterFederationRenameCount", "Number of federation rename"})
public int getRouterFederationRenameCount() {

View File

@ -306,6 +306,7 @@ public boolean registerNamenode(NamenodeStatusReport report)
report.getHighestPriorityLowRedundancyReplicatedBlocks());
stats.setHighestPriorityLowRedundancyECBlocks(
report.getHighestPriorityLowRedundancyECBlocks());
stats.setPendingSPSPaths(report.getPendingSPSPaths());
record.setStats(stats);
}

View File

@ -75,6 +75,7 @@ public class NamenodeStatusReport {
private long numberOfMissingBlocksWithReplicationFactorOne = -1;
private long highestPriorityLowRedundancyReplicatedBlocks = -1;
private long highestPriorityLowRedundancyECBlocks = -1;
private int pendingSPSPaths = -1;
/** If the fields are valid. */
private boolean registrationValid = false;
@ -367,12 +368,13 @@ public int getNumEnteringMaintenanceDataNodes() {
* @param numBlocksPendingReplication Number of blocks pending replication.
* @param numBlocksUnderReplicated Number of blocks under replication.
* @param numBlocksPendingDeletion Number of blocks pending deletion.
* @param providedSpace Space in provided storage.
* @param providedStorageSpace Space in provided storage.
* @param numPendingSPSPaths The number of paths to be processed by storage policy satisfier.
*/
public void setNamesystemInfo(long available, long total,
long numFiles, long numBlocks, long numBlocksMissing,
long numBlocksPendingReplication, long numBlocksUnderReplicated,
long numBlocksPendingDeletion, long providedSpace) {
long numBlocksPendingDeletion, long providedStorageSpace, int numPendingSPSPaths) {
this.totalSpace = total;
this.availableSpace = available;
this.numOfBlocks = numBlocks;
@ -382,7 +384,8 @@ public void setNamesystemInfo(long available, long total,
this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
this.numOfFiles = numFiles;
this.statsValid = true;
this.providedSpace = providedSpace;
this.providedSpace = providedStorageSpace;
this.pendingSPSPaths = numPendingSPSPaths;
}
/**
@ -460,6 +463,15 @@ public long getHighestPriorityLowRedundancyECBlocks() {
return this.highestPriorityLowRedundancyECBlocks;
}
/**
* Returns the number of paths to be processed by storage policy satisfier.
*
* @return The number of paths to be processed by sps.
*/
public int getPendingSPSPaths() {
return this.pendingSPSPaths;
}
/**
* Get the number of blocks.
*

View File

@ -478,7 +478,8 @@ private void getFsNamesystemMetrics(String address,
jsonObject.getLong("PendingReplicationBlocks"),
jsonObject.getLong("UnderReplicatedBlocks"),
jsonObject.getLong("PendingDeletionBlocks"),
jsonObject.optLong("ProvidedCapacityTotal"));
jsonObject.optLong("ProvidedCapacityTotal"),
jsonObject.getInt("PendingSPSPaths"));
}
}
}

View File

@ -133,6 +133,10 @@ public abstract void setHighestPriorityLowRedundancyECBlocks(
public abstract long getHighestPriorityLowRedundancyECBlocks();
public abstract void setPendingSPSPaths(int pendingSPSPaths);
public abstract int getPendingSPSPaths();
@Override
public SortedMap<String, String> getPrimaryKeys() {
// This record is not stored directly, no key needed

View File

@ -297,4 +297,14 @@ public long getHighestPriorityLowRedundancyECBlocks() {
return this.translator.getProtoOrBuilder()
.getHighestPriorityLowRedundancyECBlocks();
}
@Override
public void setPendingSPSPaths(int pendingSPSPaths) {
this.translator.getBuilder().setPendingSPSPaths(pendingSPSPaths);
}
@Override
public int getPendingSPSPaths() {
return this.translator.getProtoOrBuilder().getPendingSPSPaths();
}
}

View File

@ -54,6 +54,7 @@ message NamenodeMembershipStatsRecordProto {
optional uint64 numberOfMissingBlocksWithReplicationFactorOne = 31;
optional uint64 highestPriorityLowRedundancyReplicatedBlocks = 32;
optional uint64 HighestPriorityLowRedundancyECBlocks = 33;
optional uint32 pendingSPSPaths = 34;
}
message NamenodeMembershipRecordProto {

View File

@ -219,6 +219,8 @@ public void testNameserviceStatsDataSource()
json.getLong("numOfEnteringMaintenanceDataNodes"));
assertEquals(stats.getProvidedSpace(),
json.getLong("providedSpace"));
assertEquals(stats.getPendingSPSPaths(),
json.getInt("pendingSPSPaths"));
nameservicesFound++;
}
assertEquals(getNameservices().size(), nameservicesFound);
@ -296,6 +298,7 @@ private void validateClusterStatsFederationBean(FederationMBean bean) {
long highestPriorityLowRedundancyReplicatedBlocks = 0;
long highestPriorityLowRedundancyECBlocks = 0;
long numFiles = 0;
int pendingSPSPaths = 0;
for (MembershipState mock : getActiveMemberships()) {
MembershipStats stats = mock.getStats();
numBlocks += stats.getNumOfBlocks();
@ -316,6 +319,7 @@ private void validateClusterStatsFederationBean(FederationMBean bean) {
stats.getHighestPriorityLowRedundancyReplicatedBlocks();
highestPriorityLowRedundancyECBlocks +=
stats.getHighestPriorityLowRedundancyECBlocks();
pendingSPSPaths += stats.getPendingSPSPaths();
}
assertEquals(numBlocks, bean.getNumBlocks());
@ -342,6 +346,7 @@ private void validateClusterStatsFederationBean(FederationMBean bean) {
bean.getHighestPriorityLowRedundancyReplicatedBlocks());
assertEquals(highestPriorityLowRedundancyECBlocks,
bean.getHighestPriorityLowRedundancyECBlocks());
assertEquals(pendingSPSPaths, bean.getPendingSPSPaths());
}
private void validateClusterStatsRouterBean(RouterMBean bean) {

View File

@ -269,6 +269,7 @@ public static MembershipState createMockRegistrationForNamenode(
stats.setNumOfDecomActiveDatanodes(15);
stats.setNumOfDecomDeadDatanodes(5);
stats.setNumOfBlocks(10);
stats.setPendingSPSPaths(10);
entry.setStats(stats);
return entry;
}

View File

@ -298,6 +298,14 @@ public long getTotalECBlockGroups() {
return blocksMap.getECBlockGroups();
}
/** Used by metrics. */
public int getPendingSPSPaths() {
if (spsManager != null) {
return spsManager.getPendingSPSPaths();
}
return 0;
}
/**
* redundancyRecheckInterval is how often namenode checks for new
* reconstruction work.

View File

@ -4875,6 +4875,12 @@ public long getCurrentTokensCount() {
dtSecretManager.getCurrentTokensSize() : -1;
}
@Override
@Metric({"PendingSPSPaths", "The number of paths to be processed by storage policy satisfier"})
public int getPendingSPSPaths() {
return blockManager.getPendingSPSPaths();
}
/**
* Returns the length of the wait Queue for the FSNameSystemLock.
*

View File

@ -254,4 +254,11 @@ public interface FSNamesystemMBean {
* @return number of DTs
*/
long getCurrentTokensCount();
/**
* Returns the number of paths to be processed by storage policy satisfier.
*
* @return The number of paths to be processed by sps.
*/
int getPendingSPSPaths();
}

View File

@ -60,7 +60,7 @@ public class StoragePolicySatisfyManager {
private final StoragePolicySatisfier spsService;
private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed;
private final Queue<Long> pathsToBeTraversed;
private final int outstandingPathsLimit;
private final Namesystem namesystem;
@ -77,7 +77,7 @@ public StoragePolicySatisfyManager(Configuration conf,
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
mode = StoragePolicySatisfierMode.fromString(modeVal);
pathsToBeTraveresed = new LinkedList<Long>();
pathsToBeTraversed = new LinkedList<Long>();
this.namesystem = namesystem;
// instantiate SPS service by just keeps config reference and not starting
// any supporting threads.
@ -218,8 +218,8 @@ public boolean isSatisfierRunning() {
* storages.
*/
public Long getNextPathId() {
synchronized (pathsToBeTraveresed) {
return pathsToBeTraveresed.poll();
synchronized (pathsToBeTraversed) {
return pathsToBeTraversed.poll();
}
}
@ -228,7 +228,7 @@ public Long getNextPathId() {
* @throws IOException
*/
public void verifyOutstandingPathQLimit() throws IOException {
long size = pathsToBeTraveresed.size();
long size = pathsToBeTraversed.size();
// Checking that the SPS call Q exceeds the allowed limit.
if (outstandingPathsLimit - size <= 0) {
LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
@ -244,15 +244,15 @@ public void verifyOutstandingPathQLimit() throws IOException {
* @throws IOException
*/
private void clearPathIds(){
synchronized (pathsToBeTraveresed) {
Iterator<Long> iterator = pathsToBeTraveresed.iterator();
synchronized (pathsToBeTraversed) {
Iterator<Long> iterator = pathsToBeTraversed.iterator();
while (iterator.hasNext()) {
Long trackId = iterator.next();
try {
namesystem.removeXattr(trackId,
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
} catch (IOException e) {
LOG.debug("Failed to remove sps xatttr!", e);
LOG.debug("Failed to remove sps xattr!", e);
}
iterator.remove();
}
@ -263,8 +263,8 @@ private void clearPathIds(){
* Clean up all sps path ids.
*/
public void removeAllPathIds() {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.clear();
synchronized (pathsToBeTraversed) {
pathsToBeTraversed.clear();
}
}
@ -273,8 +273,8 @@ public void removeAllPathIds() {
* @param id
*/
public void addPathId(long id) {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.add(id);
synchronized (pathsToBeTraversed) {
pathsToBeTraversed.add(id);
}
}
@ -292,4 +292,11 @@ public boolean isEnabled() {
public StoragePolicySatisfierMode getMode() {
return mode;
}
/**
* @return the number of paths to be processed by storage policy satisfier.
*/
public int getPendingSPSPaths() {
return pathsToBeTraversed.size();
}
}

View File

@ -36,6 +36,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@ -202,7 +203,15 @@ private void createCluster() throws IOException {
private void createCluster(boolean createMoverPath) throws IOException {
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
STORAGES_PER_DATANODE, CAPACITY, createMoverPath));
STORAGES_PER_DATANODE, CAPACITY, createMoverPath, true));
getFS();
writeContent(FILE);
}
private void createClusterDoNotStartSPS() throws IOException {
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
STORAGES_PER_DATANODE, CAPACITY, true, false));
getFS();
writeContent(FILE);
}
@ -211,12 +220,12 @@ private MiniDFSCluster startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity) throws IOException {
return startCluster(conf, storageTypes, numberOfDatanodes, storagesPerDn,
nodeCapacity, false);
nodeCapacity, false, true);
}
private MiniDFSCluster startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity, boolean createMoverPath) throws IOException {
long nodeCapacity, boolean createMoverPath, boolean startSPS) throws IOException {
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
for (int i = 0; i < numberOfDatanodes; i++) {
for (int j = 0; j < storagesPerDn; j++) {
@ -228,14 +237,16 @@ private MiniDFSCluster startCluster(final Configuration conf,
.storageTypes(storageTypes).storageCapacities(capacities).build();
cluster.waitActive();
nnc = DFSTestUtil.getNameNodeConnector(getConf(),
HdfsServerConstants.MOVER_ID_PATH, 1, createMoverPath);
if (startSPS) {
nnc = DFSTestUtil.getNameNodeConnector(getConf(),
HdfsServerConstants.MOVER_ID_PATH, 1, createMoverPath);
externalSps = new StoragePolicySatisfier(getConf());
externalCtxt = new ExternalSPSContext(externalSps, nnc);
externalSps = new StoragePolicySatisfier(getConf());
externalCtxt = new ExternalSPSContext(externalSps, nnc);
externalSps.init(externalCtxt);
externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
externalSps.init(externalCtxt);
externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
}
return cluster;
}
@ -1515,6 +1526,20 @@ public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
}
}
@Test(timeout = 300000)
public void testExternalSPSMetrics()
throws Exception {
try {
createClusterDoNotStartSPS();
dfs.satisfyStoragePolicy(new Path(FILE));
// Assert metrics.
assertEquals(1, hdfsCluster.getNamesystem().getPendingSPSPaths());
} finally {
shutdownCluster();
}
}
private static void createDirectoryTree(DistributedFileSystem dfs)
throws Exception {
// tree structure