HDFS-16488. [SPS]: Expose metrics to JMX for external SPS (#4234)
This commit is contained in:
parent
006767eb94
commit
f1e5f8e764
@ -301,6 +301,16 @@ public int getAttemptedItemsCount() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<AttemptedItemInfo> getStorageMovementAttemptedItems() {
|
||||||
|
return storageMovementAttemptedItems;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public BlockingQueue<Block> getMovementFinishedBlocks() {
|
||||||
|
return movementFinishedBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
public void clearQueues() {
|
public void clearQueues() {
|
||||||
movementFinishedBlocks.clear();
|
movementFinishedBlocks.clear();
|
||||||
synchronized (storageMovementAttemptedItems) {
|
synchronized (storageMovementAttemptedItems) {
|
||||||
|
@ -1077,7 +1077,7 @@ public void clearQueues() {
|
|||||||
* attempted or reported time stamp. This is used by
|
* attempted or reported time stamp. This is used by
|
||||||
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
|
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
|
||||||
*/
|
*/
|
||||||
final static class AttemptedItemInfo extends ItemInfo {
|
public final static class AttemptedItemInfo extends ItemInfo {
|
||||||
private long lastAttemptedOrReportedTime;
|
private long lastAttemptedOrReportedTime;
|
||||||
private final Set<Block> blocks;
|
private final Set<Block> blocks;
|
||||||
|
|
||||||
@ -1095,7 +1095,7 @@ final static class AttemptedItemInfo extends ItemInfo {
|
|||||||
* @param retryCount
|
* @param retryCount
|
||||||
* file retry count
|
* file retry count
|
||||||
*/
|
*/
|
||||||
AttemptedItemInfo(long rootId, long trackId,
|
public AttemptedItemInfo(long rootId, long trackId,
|
||||||
long lastAttemptedOrReportedTime,
|
long lastAttemptedOrReportedTime,
|
||||||
Set<Block> blocks, int retryCount) {
|
Set<Block> blocks, int retryCount) {
|
||||||
super(rootId, trackId, retryCount);
|
super(rootId, trackId, retryCount);
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -39,10 +40,12 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
|
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
|
import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
|
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
|
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
|
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -62,6 +65,7 @@ public class ExternalSPSContext implements Context {
|
|||||||
private final FileCollector fileCollector;
|
private final FileCollector fileCollector;
|
||||||
private final BlockMoveTaskHandler externalHandler;
|
private final BlockMoveTaskHandler externalHandler;
|
||||||
private final BlockMovementListener blkMovementListener;
|
private final BlockMovementListener blkMovementListener;
|
||||||
|
private ExternalSPSBeanMetrics spsBeanMetrics;
|
||||||
|
|
||||||
public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
|
public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
|
||||||
this.service = service;
|
this.service = service;
|
||||||
@ -208,4 +212,17 @@ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
|
|||||||
LOG.info("Movement attempted blocks", actualBlockMovements);
|
LOG.info("Movement attempted blocks", actualBlockMovements);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initMetrics(StoragePolicySatisfier sps) {
|
||||||
|
spsBeanMetrics = new ExternalSPSBeanMetrics(sps);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void closeMetrics() {
|
||||||
|
spsBeanMetrics.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ExternalSPSBeanMetrics getSpsBeanMetrics() {
|
||||||
|
return spsBeanMetrics;
|
||||||
|
}
|
||||||
}
|
}
|
@ -48,8 +48,7 @@
|
|||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class ExternalStoragePolicySatisfier {
|
public final class ExternalStoragePolicySatisfier {
|
||||||
public static final Logger LOG = LoggerFactory
|
public static final Logger LOG = LoggerFactory.getLogger(ExternalStoragePolicySatisfier.class);
|
||||||
.getLogger(ExternalStoragePolicySatisfier.class);
|
|
||||||
|
|
||||||
private ExternalStoragePolicySatisfier() {
|
private ExternalStoragePolicySatisfier() {
|
||||||
// This is just a class to start and run external sps.
|
// This is just a class to start and run external sps.
|
||||||
@ -60,6 +59,7 @@ private ExternalStoragePolicySatisfier() {
|
|||||||
*/
|
*/
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
NameNodeConnector nnc = null;
|
NameNodeConnector nnc = null;
|
||||||
|
ExternalSPSContext context = null;
|
||||||
try {
|
try {
|
||||||
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
|
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
|
||||||
LOG);
|
LOG);
|
||||||
@ -69,9 +69,10 @@ public static void main(String[] args) throws Exception {
|
|||||||
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
|
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
|
||||||
nnc = getNameNodeConnector(spsConf);
|
nnc = getNameNodeConnector(spsConf);
|
||||||
|
|
||||||
ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
|
context = new ExternalSPSContext(sps, nnc);
|
||||||
sps.init(context);
|
sps.init(context);
|
||||||
sps.start(StoragePolicySatisfierMode.EXTERNAL);
|
sps.start(StoragePolicySatisfierMode.EXTERNAL);
|
||||||
|
context.initMetrics(sps);
|
||||||
if (sps != null) {
|
if (sps != null) {
|
||||||
sps.join();
|
sps.join();
|
||||||
}
|
}
|
||||||
@ -82,6 +83,11 @@ public static void main(String[] args) throws Exception {
|
|||||||
if (nnc != null) {
|
if (nnc != null) {
|
||||||
nnc.close();
|
nnc.close();
|
||||||
}
|
}
|
||||||
|
if (context!= null) {
|
||||||
|
if (context.getSpsBeanMetrics() != null) {
|
||||||
|
context.closeMetrics();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,100 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.sps.metrics;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||||
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.management.NotCompliantMBeanException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.StandardMBean;
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Expose the ExternalSPS metrics.
|
||||||
|
*/
|
||||||
|
public class ExternalSPSBeanMetrics implements ExternalSPSMXBean {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ExternalSPSBeanMetrics.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ExternalSPS bean.
|
||||||
|
*/
|
||||||
|
private ObjectName externalSPSBeanName;
|
||||||
|
private StoragePolicySatisfier storagePolicySatisfier;
|
||||||
|
|
||||||
|
public ExternalSPSBeanMetrics(StoragePolicySatisfier sps) {
|
||||||
|
try {
|
||||||
|
this.storagePolicySatisfier = sps;
|
||||||
|
StandardMBean bean = new StandardMBean(this, ExternalSPSMXBean.class);
|
||||||
|
this.externalSPSBeanName = MBeans.register("ExternalSPS", "ExternalSPS", bean);
|
||||||
|
LOG.info("Registered ExternalSPS MBean: {}", this.externalSPSBeanName);
|
||||||
|
} catch (NotCompliantMBeanException e) {
|
||||||
|
throw new RuntimeException("Bad externalSPS MBean setup", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unregister the JMX interfaces.
|
||||||
|
*/
|
||||||
|
public void close() {
|
||||||
|
if (externalSPSBeanName != null) {
|
||||||
|
MBeans.unregister(externalSPSBeanName);
|
||||||
|
externalSPSBeanName = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getProcessingQueueSize() {
|
||||||
|
return storagePolicySatisfier.processingQueueSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void updateProcessingQueueSize() {
|
||||||
|
storagePolicySatisfier.getStorageMovementQueue()
|
||||||
|
.add(new ItemInfo(0, 1, 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMovementFinishedBlocksCount() {
|
||||||
|
return storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocksCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void updateMovementFinishedBlocksCount() {
|
||||||
|
storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocks()
|
||||||
|
.add(new Block(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAttemptedItemsCount() {
|
||||||
|
return storagePolicySatisfier.getAttemptedItemsMonitor().getAttemptedItemsCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void updateAttemptedItemsCount() {
|
||||||
|
storagePolicySatisfier.getAttemptedItemsMonitor().getStorageMovementAttemptedItems()
|
||||||
|
.add(new StoragePolicySatisfier.AttemptedItemInfo(0, 1, 1, new HashSet<>(), 1));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,52 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.sps.metrics;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the JMX management interface for ExternalSPS information.
|
||||||
|
* End users shouldn't be implementing these interfaces, and instead
|
||||||
|
* access this information through the JMX APIs.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Stable
|
||||||
|
public interface ExternalSPSMXBean {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the queue size of StorageMovementNeeded.
|
||||||
|
*
|
||||||
|
* @return the queue size of StorageMovementNeeded.
|
||||||
|
*/
|
||||||
|
int getProcessingQueueSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the count of movement finished blocks.
|
||||||
|
*
|
||||||
|
* @return the count of movement finished blocks.
|
||||||
|
*/
|
||||||
|
int getMovementFinishedBlocksCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the count of attempted items.
|
||||||
|
*
|
||||||
|
* @return the count of attempted items.
|
||||||
|
*/
|
||||||
|
int getAttemptedItemsCount();
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This package provides the ability to expose external SPS metrics to JMX.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.sps.metrics;
|
@ -44,6 +44,7 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -84,6 +85,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
|
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
|
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||||
|
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
@ -102,6 +104,8 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.ObjectName;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1817,4 +1821,34 @@ public void clear() {
|
|||||||
actualBlockMovements.clear();
|
actualBlockMovements.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testExternalSPSMetricsExposedToJMX() throws Exception {
|
||||||
|
try {
|
||||||
|
createCluster();
|
||||||
|
// Start JMX but stop SPS thread to prevent mock data from being consumed.
|
||||||
|
externalSps.stop(true);
|
||||||
|
externalCtxt.initMetrics(externalSps);
|
||||||
|
|
||||||
|
ExternalSPSBeanMetrics spsBeanMetrics = externalCtxt.getSpsBeanMetrics();
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxBeanName = new ObjectName("Hadoop:service=ExternalSPS,name=ExternalSPS");
|
||||||
|
// Assert metrics before update.
|
||||||
|
assertEquals(0, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
|
||||||
|
assertEquals(0, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
|
||||||
|
assertEquals(0, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
|
||||||
|
|
||||||
|
// Update metrics.
|
||||||
|
spsBeanMetrics.updateAttemptedItemsCount();
|
||||||
|
spsBeanMetrics.updateProcessingQueueSize();
|
||||||
|
spsBeanMetrics.updateMovementFinishedBlocksCount();
|
||||||
|
|
||||||
|
// Assert metrics after update.
|
||||||
|
assertEquals(1, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
|
||||||
|
assertEquals(1, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
|
||||||
|
assertEquals(1, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user