diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java
new file mode 100644
index 0000000000..bbbf073cc2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Exception raised by HDFS indicating that storage capacity in the
+ * cluster filesystem is exceeded. See also
+ * https://issues.apache.org/jira/browse/MAPREDUCE-7148.
+ */
+@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
+@InterfaceStability.Evolving
+public class ClusterStorageCapacityExceededException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public ClusterStorageCapacityExceededException() {
+ super();
+ }
+
+ public ClusterStorageCapacityExceededException(String message) {
+ super(message);
+ }
+
+ public ClusterStorageCapacityExceededException(String message,
+ Throwable cause) {
+ super(message, cause);
+ }
+
+ public ClusterStorageCapacityExceededException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
index f4e7f343f9..7033f3f8ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
@@ -18,10 +18,9 @@
package org.apache.hadoop.hdfs.protocol;
-import java.io.IOException;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
/**
* This exception is thrown when modification to HDFS results in violation
@@ -37,7 +36,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class QuotaExceededException extends IOException {
+public class QuotaExceededException extends ClusterStorageCapacityExceededException {
protected static final long serialVersionUID = 1L;
protected String pathName=null;
protected long quota; // quota
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
index bd40e548c8..e81b090a67 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
@@ -26,10 +26,12 @@
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.CallerContext;
@@ -61,6 +63,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The main() for MapReduce task processes.
*/
@@ -205,8 +209,7 @@ public Object run() throws Exception {
// Report back any failures, for diagnostic purposes
if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
- umbilical.fatalError(taskid,
- StringUtils.stringifyException(exception), false);
+ reportError(exception, task, umbilical);
}
}
} catch (Throwable throwable) {
@@ -228,6 +231,27 @@ public Object run() throws Exception {
}
}
+ @VisibleForTesting
+ static void reportError(Exception exception, Task task,
+ TaskUmbilicalProtocol umbilical) throws IOException {
+ boolean fastFailJob = false;
+ boolean hasClusterStorageCapacityExceededException =
+ ExceptionUtils.indexOfType(exception,
+ ClusterStorageCapacityExceededException.class) != -1;
+ if (hasClusterStorageCapacityExceededException) {
+ boolean killJobWhenExceedClusterStorageCapacity = task.getConf()
+ .getBoolean(MRJobConfig.JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED,
+ MRJobConfig.DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED);
+ if (killJobWhenExceedClusterStorageCapacity) {
+ LOG.error(
+ "Fast fail the job because the cluster storage capacity was exceeded.");
+ fastFailJob = true;
+ }
+ }
+ umbilical.fatalError(taskid, StringUtils.stringifyException(exception),
+ fastFailJob);
+ }
+
/**
* Utility method to check if the Encrypted Spill Key needs to be set into the
* user credentials of the user running the Map / Reduce Task
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java
new file mode 100644
index 0000000000..404b1cba9e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java
@@ -0,0 +1,118 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests the behavior of YarnChild.
+ */
+public class TestYarnChild {
+ private Task task;
+ private TaskUmbilicalProtocol umbilical;
+ private Configuration conf;
+ final static private String KILL_LIMIT_EXCEED_CONF_NAME =
+ "mapreduce.job.dfs.storage.capacity.kill-limit-exceed";
+
+ @Before
+ public void setUp() throws Exception {
+ task = mock(Task.class);
+ umbilical = mock(TaskUmbilicalProtocol.class);
+ conf = new Configuration();
+ when(task.getConf()).thenReturn(conf);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionNotHappenByDefault()
+ throws IOException {
+ Exception exception = new RuntimeException(new IOException());
+
+ verifyReportError(exception, false);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailDisabled()
+ throws IOException {
+ Exception exception = new RuntimeException(new IOException());
+ conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false);
+
+ verifyReportError(exception, false);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailEnabled()
+ throws IOException {
+ Exception exception = new RuntimeException(new IOException());
+ conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
+
+ verifyReportError(exception, false);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionHappenByDefault()
+ throws IOException {
+ Exception exception =
+ new RuntimeException(new ClusterStorageCapacityExceededException());
+
+ verifyReportError(exception, false);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionHappenAndFastFailDisabled()
+ throws IOException {
+ Exception exception =
+ new RuntimeException(new ClusterStorageCapacityExceededException());
+ conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false);
+
+ verifyReportError(exception, false);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionHappenAndFastFailEnabled()
+ throws IOException {
+ Exception exception =
+ new RuntimeException(new ClusterStorageCapacityExceededException());
+ conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
+
+ verifyReportError(exception, true);
+ }
+
+ @Test
+ public void testReportErrorWhenCapacityExceptionHappenInThirdOfExceptionChain()
+ throws IOException {
+ Exception exception = new RuntimeException(new IllegalStateException(
+ new ClusterStorageCapacityExceededException()));
+ conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
+
+ verifyReportError(exception, true);
+ }
+
+ private void verifyReportError(Exception exception, boolean fastFail)
+ throws IOException {
+ YarnChild.reportError(exception, task, umbilical);
+ verify(umbilical).fatalError(any(TaskAttemptID.class), anyString(),
+ eq(fastFail));
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 3592b3d75d..565c05200d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -57,6 +57,9 @@ public interface MRJobConfig {
// negative values disable the limit
public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1;
+ public static final String JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED =
+ "mapreduce.job.dfs.storage.capacity.kill-limit-exceed";
+ public static final boolean DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED = false;
public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED =
"mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed";
// setting to false only logs the kill
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index ccc9c3df60..c993537ad7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -84,6 +84,15 @@
is only logged in the container logs.
+
+ mapreduce.job.dfs.storage.capacity.kill-limit-exceed
+ false
+ Whether to fast fail the task when exceeds allocated storage
+ capacity in the cluster filesystem(ClusterStorageCapacityExceededException
+ happens), for example, exceeds the dfs quota limitation. If true, the
+ task will fast fail. If false, the task will get retried.
+
+
mapreduce.job.maps
2