YARN-10743. Add a policy for not aggregating for containers which are killed because exceeding container log size limit. Contributed by Qi Zhu.

This commit is contained in:
Jim Brennan 2021-04-23 21:35:09 +00:00
parent ebf6b14c67
commit 1cbe35946b
3 changed files with 64 additions and 0 deletions

View File

@ -76,6 +76,8 @@
* SampleContainerLogAggregationPolicy: sample logs of successful worker * SampleContainerLogAggregationPolicy: sample logs of successful worker
* containers, in addition to application master and failed/killed * containers, in addition to application master and failed/killed
* containers. * containers.
* LimitSizeContainerLogAggregationPolicy: skip aggregation for killed
* containers whose log size exceeds the limit of container log size.
* If it isn't specified, it will use the cluster-wide default policy * If it isn't specified, it will use the cluster-wide default policy
* defined by configuration yarn.nodemanager.log-aggregation.policy.class. * defined by configuration yarn.nodemanager.log-aggregation.policy.class.
* The default value of yarn.nodemanager.log-aggregation.policy.class is * The default value of yarn.nodemanager.log-aggregation.policy.class is

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@Private
public class LimitSizeContainerLogAggregationPolicy extends
AbstractContainerLogAggregationPolicy {
public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
return logContext.getExitCode()
!= ContainerExitStatus.KILLED_FOR_EXCESS_LOGS;
}
}

View File

@ -99,6 +99,7 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
@ -1868,6 +1869,35 @@ public void testFailedContainerPolicy() throws Exception {
verifyLogAggFinishEvent(appId); verifyLogAggFinishEvent(appId);
} }
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testLimitSizeContainerLogAggregationPolicy() throws Exception {
ApplicationId appId = createApplication();
LogAggregationService logAggregationService = createLogAggregationService(
appId, LimitSizeContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] {"stdout" };
// exitCode KILLED_FOR_EXCESS_LOGS
finishContainer(
appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1,
ContainerExitStatus.KILLED_FOR_EXCESS_LOGS,
logFiles);
ContainerId container2 =
finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 0,
logFiles);
ContainerId container3 =
finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] {container2, container3},
logFiles, 2, false, EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@Test (timeout = 50000) @Test (timeout = 50000)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testAMOrFailedContainerPolicy() throws Exception { public void testAMOrFailedContainerPolicy() throws Exception {