YARN-1529: Add Localization overhead metrics to NM. Contributed by Jim_Brennan.
(cherry picked from commit e0c9653166
)
This commit is contained in:
parent
2efa28cb79
commit
0cb1b178ec
@ -224,6 +224,15 @@ enum Environment {
|
||||
@Private
|
||||
CLASSPATH_PREPEND_DISTCACHE("CLASSPATH_PREPEND_DISTCACHE"),
|
||||
|
||||
/**
|
||||
* $LOCALIZATION_COUNTERS
|
||||
*
|
||||
* Since NM does not RPC Container JVM's we pass Localization counter
|
||||
* vector as an environment variable
|
||||
*
|
||||
*/
|
||||
LOCALIZATION_COUNTERS("LOCALIZATION_COUNTERS"),
|
||||
|
||||
/**
|
||||
* $CONTAINER_ID
|
||||
* Final, exported by NodeManager and non-modifiable by users.
|
||||
|
@ -212,7 +212,7 @@ private enum ReInitOp {
|
||||
private final ResourceLocalizationService rsrcLocalizationSrvc;
|
||||
private final AbstractContainersLauncher containersLauncher;
|
||||
private final AuxServices auxiliaryServices;
|
||||
private final NodeManagerMetrics metrics;
|
||||
@VisibleForTesting final NodeManagerMetrics metrics;
|
||||
|
||||
protected final NodeStatusUpdater nodeStatusUpdater;
|
||||
|
||||
|
@ -138,4 +138,14 @@ <T> T getContainerRuntimeData(Class<T> runtimeClazz)
|
||||
* @return localization statuses.
|
||||
*/
|
||||
List<LocalizationStatus> getLocalizationStatuses();
|
||||
|
||||
/**
|
||||
* Vector of localization counters to be passed from NM to application
|
||||
* container via environment variable {@code $LOCALIZATION_COUNTERS}. See
|
||||
* {@link org.apache.hadoop.yarn.api.ApplicationConstants.Environment#LOCALIZATION_COUNTERS}
|
||||
*
|
||||
* @return coma-separated counter values
|
||||
*/
|
||||
String localizationCountersAsString();
|
||||
|
||||
}
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
@ -100,6 +101,14 @@
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
public class ContainerImpl implements Container {
|
||||
private enum LocalizationCounter {
|
||||
// 1-to-1 correspondence with MR TaskCounter.LOCALIZED_*
|
||||
BYTES_MISSED,
|
||||
BYTES_CACHED,
|
||||
FILES_MISSED,
|
||||
FILES_CACHED,
|
||||
MILLIS;
|
||||
}
|
||||
|
||||
private static final class ReInitializationContext {
|
||||
private final ContainerLaunchContext newLaunchContext;
|
||||
@ -153,6 +162,9 @@ private ReInitializationContext createContextForRollback() {
|
||||
private final NMStateStoreService stateStore;
|
||||
private final Credentials credentials;
|
||||
private final NodeManagerMetrics metrics;
|
||||
private final long[] localizationCounts =
|
||||
new long[LocalizationCounter.values().length];
|
||||
|
||||
private volatile ContainerLaunchContext launchContext;
|
||||
private volatile ContainerTokenIdentifier containerTokenIdentifier;
|
||||
private final ContainerId containerId;
|
||||
@ -1211,6 +1223,12 @@ public ContainerState transition(ContainerImpl container,
|
||||
}
|
||||
|
||||
container.containerLocalizationStartTime = clock.getTime();
|
||||
// duration = end - start;
|
||||
// record in RequestResourcesTransition: -start
|
||||
// add in LocalizedTransition: +end
|
||||
//
|
||||
container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]
|
||||
= -Time.monotonicNow();
|
||||
|
||||
// Send requests for public, private resources
|
||||
Map<String, LocalResource> cntrRsrc;
|
||||
@ -1259,6 +1277,21 @@ public ContainerState transition(ContainerImpl container,
|
||||
return ContainerState.LOCALIZING;
|
||||
}
|
||||
|
||||
final long localizedSize = rsrcEvent.getSize();
|
||||
if (localizedSize > 0) {
|
||||
container.localizationCounts
|
||||
[LocalizationCounter.BYTES_MISSED.ordinal()] += localizedSize;
|
||||
container.localizationCounts
|
||||
[LocalizationCounter.FILES_MISSED.ordinal()]++;
|
||||
} else if (localizedSize < 0) {
|
||||
// cached: recorded negative, restore the sign
|
||||
container.localizationCounts
|
||||
[LocalizationCounter.BYTES_CACHED.ordinal()] -= localizedSize;
|
||||
container.localizationCounts
|
||||
[LocalizationCounter.FILES_CACHED.ordinal()]++;
|
||||
}
|
||||
container.metrics.localizationCacheHitMiss(localizedSize);
|
||||
|
||||
// check to see if this resource should be uploaded to the shared cache
|
||||
// as well
|
||||
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
|
||||
@ -1269,6 +1302,14 @@ public ContainerState transition(ContainerImpl container,
|
||||
return ContainerState.LOCALIZING;
|
||||
}
|
||||
|
||||
// duration = end - start;
|
||||
// record in RequestResourcesTransition: -start
|
||||
// add in LocalizedTransition: +end
|
||||
//
|
||||
container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]
|
||||
+= Time.monotonicNow();
|
||||
container.metrics.localizationComplete(
|
||||
container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]);
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainerLocalizationEvent(LocalizationEventType.
|
||||
CONTAINER_RESOURCES_LOCALIZED, container));
|
||||
@ -2301,4 +2342,14 @@ public <T> T getContainerRuntimeData(Class<T> runtimeClass)
|
||||
}
|
||||
return runtimeClass.cast(containerRuntimeData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String localizationCountersAsString() {
|
||||
StringBuilder result =
|
||||
new StringBuilder(String.valueOf(localizationCounts[0]));
|
||||
for (int i = 1; i < localizationCounts.length; i++) {
|
||||
result.append(',').append(localizationCounts[i]);
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,11 @@ public class ContainerResourceLocalizedEvent extends ContainerResourceEvent {
|
||||
|
||||
private final Path loc;
|
||||
|
||||
// > 0: downloaded
|
||||
// < 0: cached
|
||||
//
|
||||
private long size;
|
||||
|
||||
public ContainerResourceLocalizedEvent(ContainerId container, LocalResourceRequest rsrc,
|
||||
Path loc) {
|
||||
super(container, ContainerEventType.RESOURCE_LOCALIZED, rsrc);
|
||||
@ -35,4 +40,12 @@ public Path getLocation() {
|
||||
return loc;
|
||||
}
|
||||
|
||||
public long getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public void setSize(long size) {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1620,6 +1620,9 @@ public void sanitizeEnv(Map<String, String> environment, Path pwd,
|
||||
|
||||
addToEnvMap(environment, nmVars, Environment.PWD.name(), pwd.toString());
|
||||
|
||||
addToEnvMap(environment, nmVars, Environment.LOCALIZATION_COUNTERS.name(),
|
||||
container.localizationCountersAsString());
|
||||
|
||||
if (!Shell.WINDOWS) {
|
||||
addToEnvMap(environment, nmVars, "JVM_PID", "$$");
|
||||
}
|
||||
|
@ -244,9 +244,11 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) {
|
||||
Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
|
||||
rsrc.size = locEvent.getSize();
|
||||
for (ContainerId container : rsrc.ref) {
|
||||
rsrc.dispatcher.getEventHandler().handle(
|
||||
final ContainerResourceLocalizedEvent localizedEvent =
|
||||
new ContainerResourceLocalizedEvent(
|
||||
container, rsrc.rsrc, rsrc.localPath));
|
||||
container, rsrc.rsrc, rsrc.localPath);
|
||||
localizedEvent.setSize(rsrc.size);
|
||||
rsrc.dispatcher.getEventHandler().handle(localizedEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -281,9 +283,11 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) {
|
||||
ResourceRequestEvent reqEvent = (ResourceRequestEvent) event;
|
||||
ContainerId container = reqEvent.getContext().getContainerId();
|
||||
rsrc.ref.add(container);
|
||||
rsrc.dispatcher.getEventHandler().handle(
|
||||
final ContainerResourceLocalizedEvent localizedEvent =
|
||||
new ContainerResourceLocalizedEvent(
|
||||
container, rsrc.rsrc, rsrc.localPath));
|
||||
container, rsrc.rsrc, rsrc.localPath);
|
||||
localizedEvent.setSize(-rsrc.size);
|
||||
rsrc.dispatcher.getEventHandler().handle(localizedEvent);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
|
||||
@ -98,6 +99,21 @@ public class NodeManagerMetrics {
|
||||
@Metric("Current CPU utilization")
|
||||
MutableGaugeFloat nodeCpuUtilization;
|
||||
|
||||
@Metric("Missed localization requests in bytes")
|
||||
MutableCounterLong localizedCacheMissBytes;
|
||||
@Metric("Cached localization requests in bytes")
|
||||
MutableCounterLong localizedCacheHitBytes;
|
||||
@Metric("Localization cache hit ratio (bytes)")
|
||||
MutableGaugeInt localizedCacheHitBytesRatio;
|
||||
@Metric("Missed localization requests (files)")
|
||||
MutableCounterLong localizedCacheMissFiles;
|
||||
@Metric("Cached localization requests (files)")
|
||||
MutableCounterLong localizedCacheHitFiles;
|
||||
@Metric("Localization cache hit ratio (files)")
|
||||
MutableGaugeInt localizedCacheHitFilesRatio;
|
||||
@Metric("Container localization time in milliseconds")
|
||||
MutableRate localizationDurationMillis;
|
||||
|
||||
// CHECKSTYLE:ON:VisibilityModifier
|
||||
|
||||
private JvmMetrics jvmMetrics = null;
|
||||
@ -411,4 +427,38 @@ public float getNodeCpuUtilization() {
|
||||
public void setNodeCpuUtilization(float cpuUtilization) {
|
||||
this.nodeCpuUtilization.set(cpuUtilization);
|
||||
}
|
||||
|
||||
private void updateLocalizationHitRatios() {
|
||||
updateLocalizationHitRatio(localizedCacheHitBytes, localizedCacheMissBytes,
|
||||
localizedCacheHitBytesRatio);
|
||||
updateLocalizationHitRatio(localizedCacheHitFiles, localizedCacheMissFiles,
|
||||
localizedCacheHitFilesRatio);
|
||||
}
|
||||
|
||||
private static void updateLocalizationHitRatio(MutableCounterLong hitCounter,
|
||||
MutableCounterLong missedCounter, MutableGaugeInt ratioGauge) {
|
||||
final long hits = hitCounter.value();
|
||||
final long misses = missedCounter.value();
|
||||
final long total = hits + misses;
|
||||
if (total > 0) {
|
||||
ratioGauge.set((int)(100 * hits / total));
|
||||
}
|
||||
}
|
||||
|
||||
public void localizationCacheHitMiss(long size) {
|
||||
if (size > 0) {
|
||||
localizedCacheMissBytes.incr(size);
|
||||
localizedCacheMissFiles.incr();
|
||||
updateLocalizationHitRatios();
|
||||
} else if (size < 0) {
|
||||
// cached: recorded negative, restore the sign
|
||||
localizedCacheHitBytes.incr(-size);
|
||||
localizedCacheHitFiles.incr();
|
||||
updateLocalizationHitRatios();
|
||||
}
|
||||
}
|
||||
|
||||
public void localizationComplete(long downloadMillis) {
|
||||
localizationDurationMillis.add(downloadMillis);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,10 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalizationStatus;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -43,6 +47,7 @@
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -50,8 +55,10 @@
|
||||
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -321,6 +328,39 @@ public void testContainerSetup() throws Exception {
|
||||
BufferedReader reader = new BufferedReader(new FileReader(targetFile));
|
||||
Assert.assertEquals("Hello World!", reader.readLine());
|
||||
Assert.assertEquals(null, reader.readLine());
|
||||
|
||||
//
|
||||
// check the localization counter
|
||||
//
|
||||
long targetFileSize =
|
||||
FileUtil.getDU(targetFile.getCanonicalFile().getParentFile());
|
||||
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
|
||||
assertCounter("LocalizedCacheMissBytes", targetFileSize, rb);
|
||||
assertCounter("LocalizedCacheHitBytes", 0L, rb);
|
||||
assertCounter("LocalizedCacheMissFiles", 1L, rb);
|
||||
assertCounter("LocalizedCacheHitFiles", 0L, rb);
|
||||
assertGaugeGt("LocalizationDurationMillisAvgTime", 0, rb);
|
||||
assertGauge("LocalizedCacheHitBytesRatio", 0, rb);
|
||||
assertGauge("LocalizedCacheHitFilesRatio", 0, rb);
|
||||
|
||||
// test cache being used
|
||||
final ContainerId cid1 = createContainerId(1);
|
||||
containerManager.startContainers(StartContainersRequest.newInstance(
|
||||
Collections.singletonList(
|
||||
StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(cid1, DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user,
|
||||
context.getContainerTokenSecretManager())))));
|
||||
waitForContainerState(containerManager, cid1, ContainerState.COMPLETE);
|
||||
rb = getMetrics("NodeManagerMetrics");
|
||||
assertCounter("LocalizedCacheMissBytes", targetFileSize, rb);
|
||||
assertCounter("LocalizedCacheHitBytes", targetFileSize, rb);
|
||||
assertCounter("LocalizedCacheMissFiles", 1L, rb);
|
||||
assertCounter("LocalizedCacheHitFiles", 1L, rb);
|
||||
assertGauge("LocalizedCacheHitBytesRatio", 50, rb);
|
||||
assertGauge("LocalizedCacheHitFilesRatio", 50, rb);
|
||||
}
|
||||
|
||||
@Test (timeout = 10000L)
|
||||
|
@ -865,6 +865,7 @@ private void verifyTailErrorLogOnContainerExit(Configuration conf,
|
||||
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 1);
|
||||
when(container.getContainerId()).thenReturn(containerId);
|
||||
when(container.getUser()).thenReturn("test");
|
||||
when(container.localizationCountersAsString()).thenReturn("");
|
||||
String relativeContainerLogDir = ContainerLaunch.getRelativeContainerLogDir(
|
||||
appId.toString(), containerId.toString());
|
||||
Path containerLogDir =
|
||||
|
@ -123,6 +123,11 @@ public String toString() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String localizationCountersAsString() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet getResourceSet() {
|
||||
return null;
|
||||
|
Loading…
Reference in New Issue
Block a user