YARN-6125. The application attempt's diagnostic message should have a maximum size

(Contributed by Andras Piros via Daniel Templeton)
This commit is contained in:
Daniel Templeton 2017-02-17 13:40:58 -08:00
parent 9a92837786
commit c7a36e6130
6 changed files with 422 additions and 13 deletions

View File

@ -2619,6 +2619,11 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0;
public static final String APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC =
YARN_PREFIX + "app.attempt.diagnostics.limit.kc";
public static final int DEFAULT_APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC = 64;
@Private
public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(

View File

@ -3022,4 +3022,19 @@
<value>3000</value>
</property>
<property>
<description>
Defines the limit of the diagnostics message of an application
attempt, in kilo characters (character count * 1024).
When using ZooKeeper to store application state behavior, it's
important to limit the size of the diagnostic messages to
prevent YARN from overwhelming ZooKeeper. In cases where
yarn.resourcemanager.state-store.max-completed-applications is set to
a large number, it may be desirable to reduce the value of this property
to limit the total data stored.
</description>
<name>yarn.app.attempt.diagnostics.limit.kc</name>
<value>64</value>
</property>
</configuration>

View File

@ -52,8 +52,17 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<!--
junit must be before mockito-all on the classpath. mockito-all bundles its
own copy of the hamcrest classes, but they don't match our junit version.
-->
<dependency>
<groupId>org.mockito</groupId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
@ -72,11 +81,6 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -119,6 +120,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
"%s State change from %s to %s on event = %s";
private static final String RECOVERY_MESSAGE =
"Recovering attempt: %s with final state = %s";
private static final String DIAGNOSTIC_LIMIT_CONFIG_ERROR_MESSAGE =
"The value of %s should be a positive integer: %s";
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
@ -127,6 +130,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
public final static Priority AM_CONTAINER_PRIORITY = recordFactory
.newRecordInstance(Priority.class);
static {
AM_CONTAINER_PRIORITY.setPriority(0);
}
@ -171,7 +175,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
private FinalApplicationStatus finalStatus = null;
private final StringBuilder diagnostics = new StringBuilder();
private final BoundedAppender diagnostics;
private int amContainerExitStatus = ContainerExitStatus.INVALID;
private Configuration conf;
@ -518,6 +522,45 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.amReq = amReq;
this.blacklistedNodesForAM = amBlacklistManager;
final int diagnosticsLimitKC = getDiagnosticsLimitKCOrThrow(conf);
if (LOG.isDebugEnabled()) {
LOG.debug(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC + " : " +
diagnosticsLimitKC);
}
this.diagnostics = new BoundedAppender(diagnosticsLimitKC * 1024);
}
private int getDiagnosticsLimitKCOrThrow(final Configuration configuration) {
try {
final int diagnosticsLimitKC = configuration.getInt(
YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC,
YarnConfiguration.DEFAULT_APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC);
if (diagnosticsLimitKC <= 0) {
final String message =
String.format(DIAGNOSTIC_LIMIT_CONFIG_ERROR_MESSAGE,
YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC,
diagnosticsLimitKC);
LOG.error(message);
throw new YarnRuntimeException(message);
}
return diagnosticsLimitKC;
} catch (final NumberFormatException ignored) {
final String diagnosticsLimitKCString = configuration
.get(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC);
final String message =
String.format(DIAGNOSTIC_LIMIT_CONFIG_ERROR_MESSAGE,
YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC,
diagnosticsLimitKCString);
LOG.error(message);
throw new YarnRuntimeException(message);
}
}
@Override
@ -738,6 +781,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
}
@VisibleForTesting
void appendDiagnostics(final CharSequence message) {
this.diagnostics.append(message);
}
public int getAMContainerExitStatus() {
this.readLock.lock();
try {
@ -926,8 +974,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
attemptState.getState()));
}
diagnostics.append("Attempt recovered after RM restart");
diagnostics.append(attemptState.getDiagnostics());
this.diagnostics.append("Attempt recovered after RM restart");
this.diagnostics.append(attemptState.getDiagnostics());
this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
this.attemptMetrics.setIsPreempted();
@ -942,7 +990,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.startTime = attemptState.getStartTime();
this.finishTime = attemptState.getFinishTime();
this.attemptMetrics.updateAggregateAppResourceUsage(
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
attemptState.getPreemptedMemorySeconds(),
attemptState.getPreemptedVcoreSeconds());
@ -1655,8 +1703,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private void setAMContainerCrashedDiagnosticsAndExitStatus(
RMAppAttemptContainerFinishedEvent finishEvent) {
ContainerStatus status = finishEvent.getContainerStatus();
String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
this.diagnostics.append(diagnostics);
this.diagnostics.append(getAMContainerCrashedDiagnostics(finishEvent));
this.amContainerExitStatus = status.getExitStatus();
}
@ -1825,7 +1872,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
progress = 1.0f;
RMAppAttemptUnregistrationEvent unregisterEvent =
(RMAppAttemptUnregistrationEvent) event;
diagnostics.append(unregisterEvent.getDiagnosticMsg());
this.diagnostics.append(unregisterEvent.getDiagnosticMsg());
originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
finalStatus = unregisterEvent.getFinalApplicationStatus();
}
@ -2232,4 +2279,115 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
return Collections.EMPTY_SET;
}
/**
* A {@link CharSequence} appender that considers its {@link #limit} as upper
* bound.
* <p>
* When {@link #limit} would be reached on append, past messages will be
* truncated from head, and a header telling the user about truncation will be
* prepended, with ellipses in between header and messages.
* <p>
* Note that header and ellipses are not counted against {@link #limit}.
* <p>
* An example:
*
* <pre>
* {@code
* // At the beginning it's an empty string
* final Appendable shortAppender = new BoundedAppender(80);
* // The whole message fits into limit
* shortAppender.append(
* "message1 this is a very long message but fitting into limit\n");
* // The first message is truncated, the second not
* shortAppender.append("message2 this is shorter than the previous one\n");
* // The first message is deleted, the second truncated, the third
* // preserved
* shortAppender.append("message3 this is even shorter message, maybe.\n");
* // The first two are deleted, the third one truncated, the last preserved
* shortAppender.append("message4 the shortest one, yet the greatest :)");
* // Current contents are like this:
* // Diagnostic messages truncated, showing last 80 chars out of 199:
* // ...s is even shorter message, maybe.
* // message4 the shortest one, yet the greatest :)
* }
* </pre>
* <p>
* Note that <tt>null</tt> values are {@link #append(CharSequence) append}ed
* just like in {@link StringBuilder#append(CharSequence) original
* implementation}.
* <p>
* Note that this class is not thread safe.
*/
@VisibleForTesting
static class BoundedAppender {
@VisibleForTesting
static final String TRUNCATED_MESSAGES_TEMPLATE =
"Diagnostic messages truncated, showing last "
+ "%d chars out of %d:%n...%s";
private final int limit;
private final StringBuilder messages = new StringBuilder();
private int totalCharacterCount = 0;
BoundedAppender(final int limit) {
Preconditions.checkArgument(limit > 0, "limit should be positive");
this.limit = limit;
}
/**
* Append a {@link CharSequence} considering {@link #limit}, truncating
* from the head of {@code csq} or {@link #messages} when necessary.
*
* @param csq the {@link CharSequence} to append
* @return this
*/
BoundedAppender append(final CharSequence csq) {
appendAndCount(csq);
checkAndCut();
return this;
}
private void appendAndCount(final CharSequence csq) {
final int before = messages.length();
messages.append(csq);
final int after = messages.length();
totalCharacterCount += after - before;
}
private void checkAndCut() {
if (messages.length() > limit) {
final int newStart = messages.length() - limit;
messages.delete(0, newStart);
}
}
/**
* Get current length of messages considering truncates
* without header and ellipses.
*
* @return current length
*/
int length() {
return messages.length();
}
/**
* Get a string representation of the actual contents, displaying also a
* header and ellipses when there was a truncate.
*
* @return String representation of the {@link #messages}
*/
@Override
public String toString() {
if (messages.length() < totalCharacterCount) {
return String.format(TRUNCATED_MESSAGES_TEMPLATE, messages.length(),
totalCharacterCount, messages.toString());
}
return messages.toString();
}
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.BoundedAppender;
/**
* Test class for {@link BoundedAppender}.
*/
public class TestBoundedAppender {
@Rule
public ExpectedException expected = ExpectedException.none();
@Test
public void initWithZeroLimitThrowsException() {
expected.expect(IllegalArgumentException.class);
expected.expectMessage("limit should be positive");
new BoundedAppender(0);
}
@Test
public void nullAppendedNullStringRead() {
final BoundedAppender boundedAppender = new BoundedAppender(4);
boundedAppender.append(null);
assertEquals("null appended, \"null\" read", "null",
boundedAppender.toString());
}
@Test
public void appendBelowLimitOnceValueIsReadCorrectly() {
final BoundedAppender boundedAppender = new BoundedAppender(2);
boundedAppender.append("ab");
assertEquals("value appended is read correctly", "ab",
boundedAppender.toString());
}
@Test
public void appendValuesBelowLimitAreReadCorrectlyInFifoOrder() {
final BoundedAppender boundedAppender = new BoundedAppender(3);
boundedAppender.append("ab");
boundedAppender.append("cd");
boundedAppender.append("e");
boundedAppender.append("fg");
assertEquals("last values appended fitting limit are read correctly",
String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 7, "efg"),
boundedAppender.toString());
}
@Test
public void appendLastAboveLimitPreservesLastMessagePostfix() {
final BoundedAppender boundedAppender = new BoundedAppender(3);
boundedAppender.append("ab");
boundedAppender.append("cde");
boundedAppender.append("fghij");
assertEquals(
"last value appended above limit postfix is read correctly", String
.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 10, "hij"),
boundedAppender.toString());
}
@Test
public void appendMiddleAboveLimitPreservesLastMessageAndMiddlePostfix() {
final BoundedAppender boundedAppender = new BoundedAppender(3);
boundedAppender.append("ab");
boundedAppender.append("cde");
assertEquals("last value appended above limit postfix is read correctly",
String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 5, "cde"),
boundedAppender.toString());
boundedAppender.append("fg");
assertEquals(
"middle value appended above limit postfix and last value are "
+ "read correctly",
String.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 7, "efg"),
boundedAppender.toString());
boundedAppender.append("hijkl");
assertEquals(
"last value appended above limit postfix is read correctly", String
.format(BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 3, 12, "jkl"),
boundedAppender.toString());
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Testing {@link RMAppAttemptImpl#diagnostics} scenarios.
*/
public class TestRMAppAttemptImplDiagnostics {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void whenCreatedWithDefaultConfigurationSuccess() {
final Configuration configuration = new Configuration();
configuration.setInt(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC,
YarnConfiguration.DEFAULT_APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC);
createRMAppAttemptImpl(configuration);
}
@Test
public void whenCreatedWithWrongConfigurationError() {
final Configuration configuration = new Configuration();
configuration.setInt(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, 0);
expectedException.expect(YarnRuntimeException.class);
createRMAppAttemptImpl(configuration);
}
@Test
public void whenAppendedWithinLimitMessagesArePreserved() {
final Configuration configuration = new Configuration();
configuration.setInt(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, 1);
final RMAppAttemptImpl appAttempt = createRMAppAttemptImpl(configuration);
final String withinLimit = RandomStringUtils.random(1024);
appAttempt.appendDiagnostics(withinLimit);
assertEquals("messages within limit should be preserved", withinLimit,
appAttempt.getDiagnostics());
}
@Test
public void whenAppendedBeyondLimitMessagesAreTruncated() {
final Configuration configuration = new Configuration();
configuration.setInt(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, 1);
final RMAppAttemptImpl appAttempt = createRMAppAttemptImpl(configuration);
final String beyondLimit = RandomStringUtils.random(1025);
appAttempt.appendDiagnostics(beyondLimit);
final String truncated = String.format(
RMAppAttemptImpl.BoundedAppender.TRUNCATED_MESSAGES_TEMPLATE, 1024,
1025, beyondLimit.substring(1));
assertEquals("messages beyond limit should be truncated", truncated,
appAttempt.getDiagnostics());
}
private RMAppAttemptImpl createRMAppAttemptImpl(
final Configuration configuration) {
final ApplicationAttemptId mockApplicationAttemptId =
mock(ApplicationAttemptId.class);
final ApplicationId mockApplicationId = mock(ApplicationId.class);
when(mockApplicationAttemptId.getApplicationId())
.thenReturn(mockApplicationId);
final RMContext mockRMContext = mock(RMContext.class);
final Dispatcher mockDispatcher = mock(Dispatcher.class);
when(mockRMContext.getDispatcher()).thenReturn(mockDispatcher);
return new RMAppAttemptImpl(mockApplicationAttemptId, mockRMContext, null,
null, null, configuration, false, null);
}
}