MAPREDUCE-7148. Fast fail jobs when exceeds dfs quota limitation. Contributed by Wang Yan

This commit is contained in:
Jason Lowe 2018-11-07 08:20:49 -06:00
parent 8dc1f6dbf7
commit 0b6625a973
6 changed files with 209 additions and 5 deletions

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Exception raised by HDFS indicating that storage capacity in the
* cluster filesystem is exceeded. See also
* https://issues.apache.org/jira/browse/MAPREDUCE-7148.
*/
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
@InterfaceStability.Evolving
public class ClusterStorageCapacityExceededException extends IOException {
private static final long serialVersionUID = 1L;
public ClusterStorageCapacityExceededException() {
super();
}
public ClusterStorageCapacityExceededException(String message) {
super(message);
}
public ClusterStorageCapacityExceededException(String message,
Throwable cause) {
super(message, cause);
}
public ClusterStorageCapacityExceededException(Throwable cause) {
super(cause);
}
}

View File

@ -18,10 +18,9 @@
package org.apache.hadoop.hdfs.protocol; package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
/** /**
* This exception is thrown when modification to HDFS results in violation * This exception is thrown when modification to HDFS results in violation
@ -37,7 +36,7 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class QuotaExceededException extends IOException { public class QuotaExceededException extends ClusterStorageCapacityExceededException {
protected static final long serialVersionUID = 1L; protected static final long serialVersionUID = 1L;
protected String pathName=null; protected String pathName=null;
protected long quota; // quota protected long quota; // quota

View File

@ -26,10 +26,12 @@
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.CallerContext;
@ -61,6 +63,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The main() for MapReduce task processes. * The main() for MapReduce task processes.
*/ */
@ -205,8 +209,7 @@ public Object run() throws Exception {
// Report back any failures, for diagnostic purposes // Report back any failures, for diagnostic purposes
if (taskid != null) { if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) { if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fatalError(taskid, reportError(exception, task, umbilical);
StringUtils.stringifyException(exception), false);
} }
} }
} catch (Throwable throwable) { } catch (Throwable throwable) {
@ -228,6 +231,27 @@ public Object run() throws Exception {
} }
} }
@VisibleForTesting
static void reportError(Exception exception, Task task,
TaskUmbilicalProtocol umbilical) throws IOException {
boolean fastFailJob = false;
boolean hasClusterStorageCapacityExceededException =
ExceptionUtils.indexOfType(exception,
ClusterStorageCapacityExceededException.class) != -1;
if (hasClusterStorageCapacityExceededException) {
boolean killJobWhenExceedClusterStorageCapacity = task.getConf()
.getBoolean(MRJobConfig.JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED,
MRJobConfig.DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED);
if (killJobWhenExceedClusterStorageCapacity) {
LOG.error(
"Fast fail the job because the cluster storage capacity was exceeded.");
fastFailJob = true;
}
}
umbilical.fatalError(taskid, StringUtils.stringifyException(exception),
fastFailJob);
}
/** /**
* Utility method to check if the Encrypted Spill Key needs to be set into the * Utility method to check if the Encrypted Spill Key needs to be set into the
* user credentials of the user running the Map / Reduce Task * user credentials of the user running the Map / Reduce Task

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;
/**
* Tests the behavior of YarnChild.
*/
public class TestYarnChild {
private Task task;
private TaskUmbilicalProtocol umbilical;
private Configuration conf;
final static private String KILL_LIMIT_EXCEED_CONF_NAME =
"mapreduce.job.dfs.storage.capacity.kill-limit-exceed";
@Before
public void setUp() throws Exception {
task = mock(Task.class);
umbilical = mock(TaskUmbilicalProtocol.class);
conf = new Configuration();
when(task.getConf()).thenReturn(conf);
}
@Test
public void testReportErrorWhenCapacityExceptionNotHappenByDefault()
throws IOException {
Exception exception = new RuntimeException(new IOException());
verifyReportError(exception, false);
}
@Test
public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailDisabled()
throws IOException {
Exception exception = new RuntimeException(new IOException());
conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false);
verifyReportError(exception, false);
}
@Test
public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailEnabled()
throws IOException {
Exception exception = new RuntimeException(new IOException());
conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
verifyReportError(exception, false);
}
@Test
public void testReportErrorWhenCapacityExceptionHappenByDefault()
throws IOException {
Exception exception =
new RuntimeException(new ClusterStorageCapacityExceededException());
verifyReportError(exception, false);
}
@Test
public void testReportErrorWhenCapacityExceptionHappenAndFastFailDisabled()
throws IOException {
Exception exception =
new RuntimeException(new ClusterStorageCapacityExceededException());
conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false);
verifyReportError(exception, false);
}
@Test
public void testReportErrorWhenCapacityExceptionHappenAndFastFailEnabled()
throws IOException {
Exception exception =
new RuntimeException(new ClusterStorageCapacityExceededException());
conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
verifyReportError(exception, true);
}
@Test
public void testReportErrorWhenCapacityExceptionHappenInThirdOfExceptionChain()
throws IOException {
Exception exception = new RuntimeException(new IllegalStateException(
new ClusterStorageCapacityExceededException()));
conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
verifyReportError(exception, true);
}
private void verifyReportError(Exception exception, boolean fastFail)
throws IOException {
YarnChild.reportError(exception, task, umbilical);
verify(umbilical).fatalError(any(TaskAttemptID.class), anyString(),
eq(fastFail));
}
}

View File

@ -57,6 +57,9 @@ public interface MRJobConfig {
// negative values disable the limit // negative values disable the limit
public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1; public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1;
public static final String JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED =
"mapreduce.job.dfs.storage.capacity.kill-limit-exceed";
public static final boolean DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED = false;
public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED =
"mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed"; "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed";
// setting to false only logs the kill // setting to false only logs the kill

View File

@ -84,6 +84,15 @@
is only logged in the container logs.</description> is only logged in the container logs.</description>
</property> </property>
<property>
<name>mapreduce.job.dfs.storage.capacity.kill-limit-exceed</name>
<value>false</value>
<description>Whether to fast fail the task when exceeds allocated storage
capacity in the cluster filesystem(ClusterStorageCapacityExceededException
happens), for example, exceeds the dfs quota limitation. If true, the
task will fast fail. If false, the task will get retried.</description>
</property>
<property> <property>
<name>mapreduce.job.maps</name> <name>mapreduce.job.maps</name>
<value>2</value> <value>2</value>