YARN-6066. Opportunistic containers Minor fixes : API annotations, parameter name changes, checkstyles. (asuresh)

(cherry picked from commit 4985217de453a04ddffd7b52644bdc8d153f753c)
This commit is contained in:
Arun Suresh 2016-12-30 08:46:10 -08:00
parent 679478d0c6
commit 85826f6ca5
6 changed files with 27 additions and 16 deletions

View File

@ -237,10 +237,10 @@ protected void serviceInit(Configuration conf) throws Exception {
// Init startTime to current time. If all goes well, it will be reset after // Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM. // first attempt to contact RM.
retrystartTime = System.currentTimeMillis(); retrystartTime = System.currentTimeMillis();
this.scheduledRequests.setNumOpportunisticMapsPer100( this.scheduledRequests.setNumOpportunisticMapsPercent(
conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE, conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENT,
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE)); MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT));
LOG.info(this.scheduledRequests.getNumOpportunisticMapsPer100() + LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() +
"% of the mappers will be scheduled using OPPORTUNISTIC containers"); "% of the mappers will be scheduled using OPPORTUNISTIC containers");
} }
@ -1056,14 +1056,14 @@ class ScheduledRequests {
final Map<TaskAttemptId, ContainerRequest> maps = final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>(); new LinkedHashMap<TaskAttemptId, ContainerRequest>();
int mapsMod100 = 0; int mapsMod100 = 0;
int numOpportunisticMapsPer100 = 0; int numOpportunisticMapsPercent = 0;
void setNumOpportunisticMapsPer100(int numMaps) { void setNumOpportunisticMapsPercent(int numMaps) {
this.numOpportunisticMapsPer100 = numMaps; this.numOpportunisticMapsPercent = numMaps;
} }
int getNumOpportunisticMapsPer100() { int getNumOpportunisticMapsPercent() {
return this.numOpportunisticMapsPer100; return this.numOpportunisticMapsPercent;
} }
@VisibleForTesting @VisibleForTesting
@ -1110,7 +1110,7 @@ void addMap(ContainerRequestEvent event) {
maps.put(event.getAttemptID(), request); maps.put(event.getAttemptID(), request);
addContainerReq(request); addContainerReq(request);
} else { } else {
if (mapsMod100 < numOpportunisticMapsPer100) { if (mapsMod100 < numOpportunisticMapsPercent) {
request = request =
new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP, new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP,
mapNodeLabelExpression); mapNodeLabelExpression);

View File

@ -1005,9 +1005,9 @@ public interface MRJobConfig {
* requested by the AM will be opportunistic. If the total number of maps * requested by the AM will be opportunistic. If the total number of maps
* for the job is less than 'x', then ALL maps will be OPPORTUNISTIC * for the job is less than 'x', then ALL maps will be OPPORTUNISTIC
*/ */
public static final String MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE = public static final String MR_NUM_OPPORTUNISTIC_MAPS_PERCENT =
"mapreduce.job.num-opportunistic-maps-percentage"; "mapreduce.job.num-opportunistic-maps-percent";
public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE = 0; public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT = 0;
/** /**
* A comma-separated list of properties whose value will be redacted. * A comma-separated list of properties whose value will be redacted.

View File

@ -145,7 +145,7 @@ private void runMergeTest(JobConf job, FileSystem fileSystem, int
job.setNumReduceTasks(numReducers); job.setNumReduceTasks(numReducers);
// All OPPORTUNISTIC // All OPPORTUNISTIC
job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENTAGE, percent); job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENT, percent);
job.setInt("mapreduce.map.maxattempts", 1); job.setInt("mapreduce.map.maxattempts", 1);
job.setInt("mapreduce.reduce.maxattempts", 1); job.setInt("mapreduce.reduce.maxattempts", 1);
job.setInt("mapred.test.num_lines", numLines); job.setInt("mapred.test.num_lines", numLines);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
/** /**
@ -37,5 +38,6 @@ public enum ContainerState {
COMPLETE, COMPLETE,
/** Scheduled (awaiting resources) at the NM. */ /** Scheduled (awaiting resources) at the NM. */
@InterfaceStability.Unstable
SCHEDULED SCHEDULED
} }

View File

@ -314,6 +314,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Setting that controls whether opportunistic container allocation /** Setting that controls whether opportunistic container allocation
* is enabled or not. */ * is enabled or not. */
@Unstable
public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED =
RM_PREFIX + "opportunistic-container-allocation.enabled"; RM_PREFIX + "opportunistic-container-allocation.enabled";
public static final boolean public static final boolean
@ -321,12 +322,14 @@ public static boolean isAclEnabled(Configuration conf) {
/** Number of nodes to be used by the Opportunistic Container allocator for /** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during container allocation. */ * dispatching containers during container allocation. */
@Unstable
public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED = public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =
RM_PREFIX + "opportunistic-container-allocation.nodes-used"; RM_PREFIX + "opportunistic-container-allocation.nodes-used";
public static final int DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED = public static final int DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =
10; 10;
/** Frequency for computing least loaded NMs. */ /** Frequency for computing least loaded NMs. */
@Unstable
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
RM_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms"; RM_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms";
public static final long public static final long
@ -334,6 +337,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Comparator for determining node load for scheduling of opportunistic /** Comparator for determining node load for scheduling of opportunistic
* containers. */ * containers. */
@Unstable
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR = public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
RM_PREFIX + "nm-container-queuing.load-comparator"; RM_PREFIX + "nm-container-queuing.load-comparator";
public static final String DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR = public static final String DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
@ -341,6 +345,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Value of standard deviation used for calculation of queue limit /** Value of standard deviation used for calculation of queue limit
* thresholds. */ * thresholds. */
@Unstable
public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV = public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV =
RM_PREFIX + "nm-container-queuing.queue-limit-stdev"; RM_PREFIX + "nm-container-queuing.queue-limit-stdev";
public static final float DEFAULT_NM_CONTAINER_QUEUING_LIMIT_STDEV = public static final float DEFAULT_NM_CONTAINER_QUEUING_LIMIT_STDEV =
@ -349,6 +354,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Min length of container queue at NodeManager. This is a cluster-wide /** Min length of container queue at NodeManager. This is a cluster-wide
* configuration that acts as the lower-bound of optimal queue length * configuration that acts as the lower-bound of optimal queue length
* calculated by the NodeQueueLoadMonitor */ * calculated by the NodeQueueLoadMonitor */
@Unstable
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
RM_PREFIX + "nm-container-queuing.min-queue-length"; RM_PREFIX + "nm-container-queuing.min-queue-length";
public static final int DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = 5; public static final int DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = 5;
@ -356,17 +362,20 @@ public static boolean isAclEnabled(Configuration conf) {
/** Max length of container queue at NodeManager. This is a cluster-wide /** Max length of container queue at NodeManager. This is a cluster-wide
* configuration that acts as the upper-bound of optimal queue length * configuration that acts as the upper-bound of optimal queue length
* calculated by the NodeQueueLoadMonitor */ * calculated by the NodeQueueLoadMonitor */
@Unstable
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
RM_PREFIX + "nm-container-queuing.max-queue-length"; RM_PREFIX + "nm-container-queuing.max-queue-length";
public static final int DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = 15; public static final int DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = 15;
/** Min queue wait time for a container at a NodeManager. */ /** Min queue wait time for a container at a NodeManager. */
@Unstable
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
RM_PREFIX + "nm-container-queuing.min-queue-wait-time-ms"; RM_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
public static final int DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = public static final int DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
10; 10;
/** Max queue wait time for a container queue at a NodeManager. */ /** Max queue wait time for a container queue at a NodeManager. */
@Unstable
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
RM_PREFIX + "nm-container-queuing.max-queue-wait-time-ms"; RM_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
public static final int DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = public static final int DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =

View File

@ -72,10 +72,10 @@ By default, allocation of opportunistic containers is performed centrally throug
The following command can be used to run a sample pi map-reduce job, executing 40% of mappers using opportunistic containers (substitute `3.0.0-alpha2-SNAPSHOT` below with the version of Hadoop you are using): The following command can be used to run a sample pi map-reduce job, executing 40% of mappers using opportunistic containers (substitute `3.0.0-alpha2-SNAPSHOT` below with the version of Hadoop you are using):
``` ```
$ hadoop jar hadoop-3.0.0-alpha2-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha2-SNAPSHOT.jar pi -Dmapreduce.job.num-opportunistic-maps-percentage="40" 50 100 $ hadoop jar hadoop-3.0.0-alpha2-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha2-SNAPSHOT.jar pi -Dmapreduce.job.num-opportunistic-maps-percent="40" 50 100
``` ```
By changing the value of `mapreduce.job.num-opportunistic-maps-percentage` in the above command, we can specify the percentage of mappers that can be executed through opportunistic containers. By changing the value of `mapreduce.job.num-opportunistic-maps-percent` in the above command, we can specify the percentage of mappers that can be executed through opportunistic containers.
###<a name="Opportunistic_Containers_in_Web_UI"></a>Opportunistic Containers in Web UI ###<a name="Opportunistic_Containers_in_Web_UI"></a>Opportunistic Containers in Web UI