From 0b6625a9735f76ab473b41d8ab9b7f3c7678cfff Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 7 Nov 2018 08:20:49 -0600 Subject: [PATCH] MAPREDUCE-7148. Fast fail jobs when exceeds dfs quota limitation. Contributed by Wang Yan --- ...usterStorageCapacityExceededException.java | 51 ++++++++ .../hdfs/protocol/QuotaExceededException.java | 5 +- .../org/apache/hadoop/mapred/YarnChild.java | 28 ++++- .../apache/hadoop/mapred/TestYarnChild.java | 118 ++++++++++++++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 3 + .../src/main/resources/mapred-default.xml | 9 ++ 6 files changed, 209 insertions(+), 5 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java new file mode 100644 index 0000000000..bbbf073cc2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Exception raised by HDFS indicating that storage capacity in the + * cluster filesystem is exceeded. See also + * https://issues.apache.org/jira/browse/MAPREDUCE-7148. + */ +@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" }) +@InterfaceStability.Evolving +public class ClusterStorageCapacityExceededException extends IOException { + private static final long serialVersionUID = 1L; + + public ClusterStorageCapacityExceededException() { + super(); + } + + public ClusterStorageCapacityExceededException(String message) { + super(message); + } + + public ClusterStorageCapacityExceededException(String message, + Throwable cause) { + super(message, cause); + } + + public ClusterStorageCapacityExceededException(Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java index f4e7f343f9..7033f3f8ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java @@ -18,10 +18,9 @@ package org.apache.hadoop.hdfs.protocol; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ClusterStorageCapacityExceededException; /** * This exception is thrown when modification to HDFS results in violation @@ -37,7 +36,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class QuotaExceededException extends IOException { +public class QuotaExceededException extends ClusterStorageCapacityExceededException { protected static final long serialVersionUID = 1L; protected String pathName=null; protected long quota; // quota diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index bd40e548c8..e81b090a67 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -26,10 +26,12 @@ import java.security.PrivilegedExceptionAction; import java.util.concurrent.ScheduledExecutorService; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ClusterStorageCapacityExceededException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.CallerContext; @@ -61,6 +63,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * The main() for MapReduce task processes. */ @@ -205,8 +209,7 @@ public Object run() throws Exception { // Report back any failures, for diagnostic purposes if (taskid != null) { if (!ShutdownHookManager.get().isShutdownInProgress()) { - umbilical.fatalError(taskid, - StringUtils.stringifyException(exception), false); + reportError(exception, task, umbilical); } } } catch (Throwable throwable) { @@ -228,6 +231,27 @@ public Object run() throws Exception { } } + @VisibleForTesting + static void reportError(Exception exception, Task task, + TaskUmbilicalProtocol umbilical) throws IOException { + boolean fastFailJob = false; + boolean hasClusterStorageCapacityExceededException = + ExceptionUtils.indexOfType(exception, + ClusterStorageCapacityExceededException.class) != -1; + if (hasClusterStorageCapacityExceededException) { + boolean killJobWhenExceedClusterStorageCapacity = task.getConf() + .getBoolean(MRJobConfig.JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED, + MRJobConfig.DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED); + if (killJobWhenExceedClusterStorageCapacity) { + LOG.error( + "Fast fail the job because the cluster storage capacity was exceeded."); + fastFailJob = true; + } + } + umbilical.fatalError(taskid, StringUtils.stringifyException(exception), + fastFailJob); + } + /** * Utility method to check if the Encrypted Spill Key needs to be set into the * user credentials of the user running the Map / Reduce Task diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java new file mode 100644 index 0000000000..404b1cba9e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java @@ -0,0 +1,118 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ClusterStorageCapacityExceededException; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +/** + * Tests the behavior of YarnChild. + */ +public class TestYarnChild { + private Task task; + private TaskUmbilicalProtocol umbilical; + private Configuration conf; + final static private String KILL_LIMIT_EXCEED_CONF_NAME = + "mapreduce.job.dfs.storage.capacity.kill-limit-exceed"; + + @Before + public void setUp() throws Exception { + task = mock(Task.class); + umbilical = mock(TaskUmbilicalProtocol.class); + conf = new Configuration(); + when(task.getConf()).thenReturn(conf); + } + + @Test + public void testReportErrorWhenCapacityExceptionNotHappenByDefault() + throws IOException { + Exception exception = new RuntimeException(new IOException()); + + verifyReportError(exception, false); + } + + @Test + public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailDisabled() + throws IOException { + Exception exception = new RuntimeException(new IOException()); + conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false); + + verifyReportError(exception, false); + } + + @Test + public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailEnabled() + throws IOException { + Exception exception = new RuntimeException(new IOException()); + conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true); + + verifyReportError(exception, false); + } + + @Test + public void testReportErrorWhenCapacityExceptionHappenByDefault() + throws IOException { + Exception exception = + new RuntimeException(new ClusterStorageCapacityExceededException()); + + verifyReportError(exception, false); + } + + @Test + public void testReportErrorWhenCapacityExceptionHappenAndFastFailDisabled() + throws IOException { + Exception exception = + new RuntimeException(new ClusterStorageCapacityExceededException()); + conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false); + + verifyReportError(exception, false); + } + + @Test + public void testReportErrorWhenCapacityExceptionHappenAndFastFailEnabled() + throws IOException { + Exception exception = + new RuntimeException(new ClusterStorageCapacityExceededException()); + conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true); + + verifyReportError(exception, true); + } + + @Test + public void testReportErrorWhenCapacityExceptionHappenInThirdOfExceptionChain() + throws IOException { + Exception exception = new RuntimeException(new IllegalStateException( + new ClusterStorageCapacityExceededException())); + conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true); + + verifyReportError(exception, true); + } + + private void verifyReportError(Exception exception, boolean fastFail) + throws IOException { + YarnChild.reportError(exception, task, umbilical); + verify(umbilical).fatalError(any(TaskAttemptID.class), anyString(), + eq(fastFail)); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 3592b3d75d..565c05200d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -57,6 +57,9 @@ public interface MRJobConfig { // negative values disable the limit public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1; + public static final String JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED = + "mapreduce.job.dfs.storage.capacity.kill-limit-exceed"; + public static final boolean DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED = false; public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed"; // setting to false only logs the kill diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index ccc9c3df60..c993537ad7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -84,6 +84,15 @@ is only logged in the container logs. + + mapreduce.job.dfs.storage.capacity.kill-limit-exceed + false + Whether to fast fail the task when exceeds allocated storage + capacity in the cluster filesystem(ClusterStorageCapacityExceededException + happens), for example, exceeds the dfs quota limitation. If true, the + task will fast fail. If false, the task will get retried. + + mapreduce.job.maps 2