Steve Loughran c9270600b7
MAPREDUCE-7474. Improve Manifest committer resilience (#6716)
Improve task commit resilience everywhere
and add an option to reduce delete IO requests on
job cleanup (relevant for ABFS and HDFS).

Task Commit Resilience
----------------------

Task manifest saving is re-attempted on failure; the number of 
attempts made is configurable with the option:

  mapreduce.manifest.committer.manifest.save.attempts

* The default is 5.
* The minimum is 1; asking for less is ignored.
* A retry policy adds 500ms of sleep per attempt.
* Move from classic rename() to commitFile() to rename the file,
  after calling getFileStatus() to get its length and possibly etag.
  This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach
  the ResilientCommitByRename callbacks in abfs, which report on
  the outcome to the caller...which is then logged at WARN.
* New statistic task_stage_save_summary_file to distinguish from
  other saving operations (job success/report file).
  This is only saved to the manifest on task commit retries, and
  provides statistics on all previous unsuccessful attempts to save
  the manifests
+ test changes to match the codepath changes, including improvements
  in fault injection.

Directory size for deletion
---------------------------

New option

  mapreduce.manifest.committer.cleanup.parallel.delete.base.first

This attempts an initial attempt at deleting the base dir, only falling
back to parallel deletes if there's a timeout.

This option is disabled by default; Consider enabling it for abfs to
reduce IO load. Consult the documentation for more details.

Success file printing
---------------------

The command to print a JSON _SUCCESS file from this committer and
any S3A committer is now something which can be invoked from
the mapred command:

  mapred successfile <path to file>

Contributed by Steve Loughran
2024-05-13 21:12:34 +01:00

184 lines
6.4 KiB
Bash
Executable File

#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The name of the script being executed.
HADOOP_SHELL_EXECNAME="mapred"
MYNAME="${BASH_SOURCE-$0}"
## @description build up the mapred command's usage text.
## @audience public
## @stability stable
## @replaceable no
function hadoop_usage
{
hadoop_add_subcommand "classpath" client "prints the class path needed for running mapreduce subcommands"
hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables"
hadoop_add_subcommand "historyserver" daemon "run job history servers as a standalone daemon"
hadoop_add_subcommand "hsadmin" admin "job history server admin interface"
hadoop_add_subcommand "job" client "manipulate MapReduce jobs"
hadoop_add_subcommand "pipes" client "run a Pipes job"
hadoop_add_subcommand "queue" client "get information regarding JobQueues"
hadoop_add_subcommand "sampler" client "sampler"
hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "minicluster" client "CLI MiniCluster"
hadoop_add_subcommand "successfile" client "Print a _SUCCESS manifest from the manifest and S3A committers"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
}
## @description Default command handler for hadoop command
## @audience public
## @stability stable
## @replaceable no
## @param CLI arguments
function mapredcmd_case
{
subcmd=$1
shift
case ${subcmd} in
mradmin|jobtracker|tasktracker|groups)
hadoop_error "Sorry, the ${subcmd} command is no longer supported."
hadoop_error "You may find similar functionality with the \"yarn\" shell command."
hadoop_exit_with_usage 1
;;
classpath)
hadoop_do_classpath_subcommand HADOOP_CLASSNAME "$@"
;;
envvars)
echo "JAVA_HOME='${JAVA_HOME}'"
echo "HADOOP_MAPRED_HOME='${HADOOP_MAPRED_HOME}'"
echo "MAPRED_DIR='${MAPRED_DIR}'"
echo "MAPRED_LIB_JARS_DIR='${MAPRED_LIB_JARS_DIR}'"
echo "HADOOP_CONF_DIR='${HADOOP_CONF_DIR}'"
echo "HADOOP_TOOLS_HOME='${HADOOP_TOOLS_HOME}'"
echo "HADOOP_TOOLS_DIR='${HADOOP_TOOLS_DIR}'"
echo "HADOOP_TOOLS_LIB_JARS_DIR='${HADOOP_TOOLS_LIB_JARS_DIR}'"
exit 0
;;
historyserver)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
if [[ -n "${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}" ]]; then
HADOOP_HEAPSIZE_MAX="${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}"
fi
HADOOP_DAEMON_ROOT_LOGGER=${HADOOP_JHS_LOGGER:-$HADOOP_DAEMON_ROOT_LOGGER}
if [[ "${HADOOP_DAEMON_MODE}" != "default" ]]; then
hadoop_add_param HADOOP_OPTS mapred.jobsummary.logger "-Dmapred.jobsummary.logger=${HADOOP_DAEMON_ROOT_LOGGER}"
fi
;;
hsadmin)
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin
;;
job)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.JobClient
;;
pipes)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.pipes.Submitter
;;
queue)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.JobQueueClient
;;
sampler)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.lib.InputSampler
;;
frameworkuploader)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.uploader.FrameworkUploader
;;
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;
successfile)
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
;;
minicluster)
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice"'/*'
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/test"'/*'
junitjar=$(echo "${HADOOP_TOOLS_LIB_JARS_DIR}"/junit-[0-9]*.jar)
hadoop_add_classpath "${junitjar}"
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.MiniHadoopClusterManager
;;
*)
HADOOP_CLASSNAME="${subcmd}"
if ! hadoop_validate_classname "${HADOOP_CLASSNAME}"; then
hadoop_exit_with_usage 1
fi
;;
esac
}
bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
# let's locate libexec...
if [[ -n "${HADOOP_HOME}" ]]; then
HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
else
HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi
HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
HADOOP_NEW_CONFIG=true
if [[ -f "${HADOOP_LIBEXEC_DIR}/mapred-config.sh" ]]; then
# shellcheck source=./hadoop-mapreduce-project/bin/mapred-config.sh
. "${HADOOP_LIBEXEC_DIR}/mapred-config.sh"
else
echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/mapred-config.sh." 2>&1
exit 1
fi
# now that we have support code, let's abs MYNAME so we can use it later
MYNAME=$(hadoop_abs "${MYNAME}")
if [ $# = 0 ]; then
hadoop_exit_with_usage 1
fi
HADOOP_SUBCMD=$1
shift
if hadoop_need_reexec mapred "${HADOOP_SUBCMD}"; then
hadoop_uservar_su mapred "${HADOOP_SUBCMD}" \
"${MYNAME}" \
"--reexec" \
"${HADOOP_USER_PARAMS[@]}"
exit $?
fi
hadoop_verify_user_perm "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"
HADOOP_SUBCMD_ARGS=("$@")
if declare -f mapred_subcommand_"${HADOOP_SUBCMD}" >/dev/null 2>&1; then
hadoop_debug "Calling dynamically: mapred_subcommand_${HADOOP_SUBCMD} ${HADOOP_SUBCMD_ARGS[*]}"
"mapred_subcommand_${HADOOP_SUBCMD}" "${HADOOP_SUBCMD_ARGS[@]}"
else
mapredcmd_case "${HADOOP_SUBCMD}" "${HADOOP_SUBCMD_ARGS[@]}"
fi
hadoop_add_client_opts
if [[ ${HADOOP_WORKER_MODE} = true ]]; then
hadoop_common_worker_mode_execute "${HADOOP_MAPRED_HOME}/bin/mapred" "${HADOOP_USER_PARAMS[@]}"
exit $?
fi
hadoop_subcommand_opts "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"
# everything is in globals at this point, so call the generic handler
hadoop_generic_java_subcmd_handler