YARN-5797. Add metrics to the node manager for cleaning the PUBLIC and PRIVATE caches. (Chris Trezzo via mingma)

This commit is contained in:
Ming Ma 2017-04-06 16:54:43 -07:00
parent 0eacd4c13b
commit 0116c3c957
8 changed files with 111 additions and 28 deletions

View File

@ -230,7 +230,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
this.metrics = metrics;
rsrcLocalizationSrvc =
createResourceLocalizationService(exec, deletionContext, context);
createResourceLocalizationService(exec, deletionContext, context,
metrics);
addService(rsrcLocalizationSrvc);
containersLauncher = createContainersLauncher(context, exec);
@ -477,9 +478,10 @@ public ContainersMonitor getContainersMonitor() {
}
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext, Context context) {
ContainerExecutor exec, DeletionService deletionContext,
Context nmContext, NodeManagerMetrics nmMetrics) {
return new ResourceLocalizationService(this.dispatcher, exec,
deletionContext, dirsHandler, context);
deletionContext, dirsHandler, nmContext, nmMetrics);
}
protected SharedCacheUploadService createSharedCacheUploaderService() {

View File

@ -131,6 +131,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@ -165,6 +166,8 @@ public class ResourceLocalizationService extends CompositeService
private final ScheduledExecutorService cacheCleanup;
private LocalizerTokenSecretManager secretManager;
private NMStateStoreService stateStore;
@VisibleForTesting
final NodeManagerMetrics metrics;
@VisibleForTesting
LocalResourcesTracker publicRsrc;
@ -194,7 +197,8 @@ public class ResourceLocalizationService extends CompositeService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler, Context context) {
LocalDirsHandlerService dirsHandler, Context context,
NodeManagerMetrics metrics) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@ -208,6 +212,7 @@ public ResourceLocalizationService(Dispatcher dispatcher,
.build());
this.stateStore = context.getNMStateStore();
this.nmContext = context;
this.metrics = metrics;
}
FileContext getLocalFileContext(Configuration conf) {
@ -530,6 +535,12 @@ LocalCacheCleanerStats handleCacheCleanup() {
} else if (LOG.isInfoEnabled()) {
LOG.info(stats.toString());
}
// Update metrics
metrics.setCacheSizeBeforeClean(stats.getCacheSizeBeforeClean());
metrics.setTotalBytesDeleted(stats.getTotalDelSize());
metrics.setPrivateBytesDeleted(stats.getPrivateDelSize());
metrics.setPublicBytesDeleted(stats.getPublicDelSize());
return stats;
}

View File

@ -69,6 +69,15 @@ public class NodeManagerMetrics {
@Metric("# of running opportunistic containers")
MutableGaugeInt runningOpportunisticContainers;
@Metric("Local cache size (public and private) before clean (Bytes)")
MutableGaugeLong cacheSizeBeforeClean;
@Metric("# of total bytes deleted from the public and private local cache")
MutableGaugeLong totalBytesDeleted;
@Metric("# of bytes deleted from the public local cache")
MutableGaugeLong publicBytesDeleted;
@Metric("# of bytes deleted from the private local cache")
MutableGaugeLong privateBytesDeleted;
// CHECKSTYLE:ON:VisibilityModifier
private JvmMetrics jvmMetrics = null;
@ -215,6 +224,22 @@ public void setGoodLogDirsDiskUtilizationPerc(
this.goodLogDirsDiskUtilizationPerc.set(goodLogDirsDiskUtilizationPerc);
}
public void setCacheSizeBeforeClean(long cacheSizeBeforeClean) {
this.cacheSizeBeforeClean.set(cacheSizeBeforeClean);
}
public void setTotalBytesDeleted(long totalBytesDeleted) {
this.totalBytesDeleted.set(totalBytesDeleted);
}
public void setPublicBytesDeleted(long publicBytesDeleted) {
this.publicBytesDeleted.set(publicBytesDeleted);
}
public void setPrivateBytesDeleted(long privateBytesDeleted) {
this.privateBytesDeleted.set(privateBytesDeleted);
}
public int getRunningContainers() {
return containersRunning.value();
}
@ -275,4 +300,20 @@ public int getAllocatedOpportunisticVCores() {
public int getRunningOpportunisticContainers() {
return runningOpportunisticContainers.value();
}
public long getCacheSizeBeforeClean() {
return this.cacheSizeBeforeClean.value();
}
public long getTotalBytesDeleted() {
return this.totalBytesDeleted.value();
}
public long getPublicBytesDeleted() {
return this.publicBytesDeleted.value();
}
public long getPrivateBytesDeleted() {
return this.privateBytesDeleted.value();
}
}

View File

@ -73,9 +73,10 @@ public DummyContainerManager(Context context, ContainerExecutor exec,
@Override
@SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext, Context context) {
ContainerExecutor exec, DeletionService deletionContext, Context context,
NodeManagerMetrics metrics) {
return new ResourceLocalizationService(super.dispatcher, exec,
deletionContext, super.dirsHandler, context) {
deletionContext, super.dirsHandler, context, metrics) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {

View File

@ -99,6 +99,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -699,8 +700,10 @@ private void waitForAppState(Application app, ApplicationState state)
private ContainerManagerImpl createContainerManager(Context context) {
final LogHandler logHandler = mock(LogHandler.class);
final NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
final ResourceLocalizationService rsrcSrv =
new ResourceLocalizationService(null, null, null, null, context) {
new ResourceLocalizationService(null, null, null, null, context,
metrics) {
@Override
public void serviceInit(Configuration conf) throws Exception {
}
@ -739,8 +742,10 @@ protected LogHandler createLogHandler(Configuration conf,
}
@Override
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext, Context context) {
protected ResourceLocalizationService
createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext,
Context context, NodeManagerMetrics metrics) {
return rsrcSrv;
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.junit.Test;
/**
@ -80,8 +81,12 @@ public void testBasicCleanup() {
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
.getLocalRsrc().size());
assertEquals(100, stats.getTotalDelSize());
assertEquals(100, rls.metrics.getTotalBytesDeleted());
assertEquals(60, stats.getPublicDelSize());
assertEquals(60, rls.metrics.getPublicBytesDeleted());
assertEquals(40, stats.getPrivateDelSize());
assertEquals(40, rls.metrics.getPrivateBytesDeleted());
assertEquals(100, rls.metrics.getCacheSizeBeforeClean());
}
@Test
@ -105,8 +110,12 @@ public void testPositiveRefCount() {
assertEquals(1, resources.getLocalRsrc().size());
assertTrue(resources.getLocalRsrc().containsKey(survivor));
assertEquals(20, stats.getTotalDelSize());
assertEquals(20, rls.metrics.getTotalBytesDeleted());
assertEquals(20, stats.getPublicDelSize());
assertEquals(20, rls.metrics.getPublicBytesDeleted());
assertEquals(0, stats.getPrivateDelSize());
assertEquals(0, rls.metrics.getPrivateBytesDeleted());
assertEquals(40, rls.metrics.getCacheSizeBeforeClean());
}
@Test
@ -164,8 +173,12 @@ public void testLRUAcrossTrackers() {
assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
assertEquals(80, stats.getTotalDelSize());
assertEquals(80, rls.metrics.getTotalBytesDeleted());
assertEquals(20, stats.getPublicDelSize());
assertEquals(20, rls.metrics.getPublicBytesDeleted());
assertEquals(60, stats.getPrivateDelSize());
assertEquals(60, rls.metrics.getPrivateBytesDeleted());
assertEquals(160, rls.metrics.getCacheSizeBeforeClean());
}
private ResourceLocalizationService createLocService(
@ -174,8 +187,10 @@ private ResourceLocalizationService createLocService(
long targetCacheSize) {
Context mockedContext = mock(Context.class);
when(mockedContext.getNMStateStore()).thenReturn(null);
NodeManagerMetrics metrics = NodeManagerMetrics.create();
ResourceLocalizationService rls =
new ResourceLocalizationService(null, null, null, null, mockedContext);
new ResourceLocalizationService(null, null, null, null, mockedContext,
metrics);
// We set the following members directly so we don't have to deal with
// mocking out the service init method.
rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.junit.Assert;
import static org.mockito.Mockito.mock;
import org.junit.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -83,8 +85,10 @@ public void testMinimumPerDirectoryFileLimit() {
new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(),
false, conf);
NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null, nmContext);
new ResourceLocalizationService(null, null, null, null, nmContext,
metrics);
try {
service.init(conf);
} catch (Exception e1) {

View File

@ -70,7 +70,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
@ -141,6 +140,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -171,6 +171,7 @@ public class TestResourceLocalizationService {
private AbstractFileSystem spylfs;
private FileContext lfs;
private NMContext nmContext;
private NodeManagerMetrics metrics;
@BeforeClass
public static void setupClass() {
mockServer = mock(Server.class);
@ -189,6 +190,7 @@ public void setup() throws IOException {
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf);
metrics = mock(NodeManagerMetrics.class);
}
@After
@ -225,7 +227,7 @@ public void testLocalizationInit() throws Exception {
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler, nmContext));
diskhandler, nmContext, metrics));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@ -286,7 +288,7 @@ public void testDirectoryCleanupOnNewlyCreatedStateStore()
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler,nmContext));
diskhandler, nmContext, metrics));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@ -357,7 +359,7 @@ public void testResourceRelease() throws Exception {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext);
dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@ -757,7 +759,7 @@ public void testLocalizerRunnerException() throws Exception {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy, nmContext);
dirsHandlerSpy, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
try {
@ -847,7 +849,7 @@ public void testLocalizationHeartbeat() throws Exception {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext);
dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -1143,7 +1145,8 @@ public void testDownloadingResourcesOnContainerKill() throws Exception {
DrainDispatcher dispatcher = getDispatcher(conf);
ResourceLocalizationService rawService = new ResourceLocalizationService(
dispatcher, exec, delService, dirsHandler, nmContext);
dispatcher, exec, delService, dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -1452,7 +1455,7 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, spyContext);
dirsHandler, spyContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -1547,7 +1550,8 @@ public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
dirsHandler.init(conf);
// Start resource localization service.
ResourceLocalizationService rawService = new ResourceLocalizationService(
dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext);
dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext,
metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).
@ -1666,7 +1670,7 @@ public void testFailedPublicResource() throws Exception {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext);
dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -1775,7 +1779,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy, nmContext);
dirsHandlerSpy, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@ -1907,7 +1911,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler, nmContext);
localDirHandler, nmContext, metrics);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@ -2060,7 +2064,7 @@ public void testLocalResourcePath() throws Exception {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler, nmContext);
localDirHandler, nmContext, metrics);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@ -2226,7 +2230,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception {
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
dirsHandler, nmContext);
dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
@ -2532,7 +2536,7 @@ private ResourceLocalizationService createSpyService(
new ApplicationACLsManager(conf), stateStore, false, conf);
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext);
dirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
@ -2596,7 +2600,7 @@ public void testFailedDirsResourceRelease() throws Exception {
// setup mocks
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
mockDirsHandler, nmContext);
mockDirsHandler, nmContext, metrics);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(