From 84973d104917c0b8cbb187ee4f9868bbce967728 Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Mon, 27 Aug 2018 16:53:06 +0200 Subject: [PATCH] MAPREDUCE-6861. Add metrics tags for ShuffleClientMetrics. (Contributed by Zoltan Siegl) --- .../hadoop/mapreduce/task/reduce/Shuffle.java | 24 +++--- .../task/reduce/ShuffleClientMetrics.java | 43 ++++++++++- .../task/reduce/TestShuffleClientMetrics.java | 75 +++++++++++++++++++ 3 files changed, 129 insertions(+), 13 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index 3382bbf843..1aad71d7db 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -37,7 +37,8 @@ @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable @SuppressWarnings({"unchecked", "rawtypes"}) -public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter { +public class Shuffle implements ShuffleConsumerPlugin, + ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MIN_EVENTS_TO_FETCH = 100; @@ -51,7 +52,7 @@ public class Shuffle implements ShuffleConsumerPlugin, ExceptionRepo private ShuffleClientMetrics metrics; private TaskUmbilicalProtocol umbilical; - private ShuffleSchedulerImpl scheduler; + private ShuffleSchedulerImpl scheduler; private MergeManager merger; private Throwable throwable = null; private String throwingThreadName = null; @@ -68,7 +69,8 @@ public void init(ShuffleConsumerPlugin.Context context) { this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); - this.metrics = ShuffleClientMetrics.create(); + this.metrics = ShuffleClientMetrics.create(context.getReduceId(), + this.jobConf); this.copyPhase = context.getCopyPhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); @@ -101,16 +103,16 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer); // Start the map-completion events fetcher thread - final EventFetcher eventFetcher = - new EventFetcher(reduceId, umbilical, scheduler, this, - maxEventsToFetch); + final EventFetcher eventFetcher = + new EventFetcher(reduceId, umbilical, scheduler, this, + maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads boolean isLocal = localMapFiles != null; final int numFetchers = isLocal ? 1 : - jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); - Fetcher[] fetchers = new Fetcher[numFetchers]; + jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); + Fetcher[] fetchers = new Fetcher[numFetchers]; if (isLocal) { fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), @@ -118,7 +120,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { fetchers[0].start(); } else { for (int i=0; i < numFetchers; ++i) { - fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, + fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); @@ -141,7 +143,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { eventFetcher.shutDown(); // Stop the map-output fetcher threads - for (Fetcher fetcher : fetchers) { + for (Fetcher fetcher : fetchers) { fetcher.shutDown(); } @@ -157,7 +159,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { try { kvIter = merger.close(); } catch (Throwable e) { - throw new ShuffleError("Error while doing final merge " , e); + throw new ShuffleError("Error while doing final merge ", e); } // Sanity check diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java index d4e185df6f..d5e97aabc2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java @@ -17,24 +17,42 @@ */ package org.apache.hadoop.mapreduce.task.reduce; +import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; + import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + import java.util.concurrent.ThreadLocalRandom; +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Metric for Shuffle client. + */ +@SuppressWarnings("checkstyle:finalclass") @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable @Metrics(name="ShuffleClientMetrics", context="mapred") public class ShuffleClientMetrics { + private static final MetricsInfo RECORD_INFO = + info("ShuffleClientMetrics", "Metrics for Shuffle client"); + @Metric private MutableCounterInt numFailedFetches; @Metric @@ -44,14 +62,23 @@ public class ShuffleClientMetrics { @Metric private MutableGaugeInt numThreadsBusy; + private final MetricsRegistry metricsRegistry = + new MetricsRegistry(RECORD_INFO); + private ShuffleClientMetrics() { } - public static ShuffleClientMetrics create() { + public static ShuffleClientMetrics create( + TaskAttemptID reduceId, + JobConf jobConf) { MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker"); + + ShuffleClientMetrics shuffleClientMetrics = new ShuffleClientMetrics(); + shuffleClientMetrics.addTags(reduceId, jobConf); + return ms.register("ShuffleClientMetrics-" + ThreadLocalRandom.current().nextInt(), null, - new ShuffleClientMetrics()); + shuffleClientMetrics); } public void inputBytes(long bytes) { @@ -69,4 +96,16 @@ public void threadBusy() { public void threadFree() { numThreadsBusy.decr(); } + + private void addTags(TaskAttemptID reduceId, JobConf jobConf) { + metricsRegistry.tag("user", "", jobConf.getUser()) + .tag("jobName", "", jobConf.getJobName()) + .tag("jobId", "", reduceId.getJobID().toString()) + .tag("taskId", "", reduceId.toString()); + } + + @VisibleForTesting + MetricsRegistry getMetricsRegistry() { + return metricsRegistry; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java new file mode 100644 index 0000000000..0baf52fb19 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.metrics2.MetricsTag; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit test for {@link TestShuffleClientMetrics}. + */ +public class TestShuffleClientMetrics { + + private static final String TEST_JOB_NAME = "Test job name"; + private static final String TEST_JOB_ID = "Test job id"; + private static final String TEST_TASK_ID = "Test task id"; + private static final String TEST_USER_NAME = "Test user name"; + + @Test + public void testShuffleMetricsTags() { + // Set up + JobID jobID = mock(JobID.class); + when(jobID.toString()).thenReturn(TEST_JOB_ID); + + TaskAttemptID reduceId = mock(TaskAttemptID.class); + when(reduceId.getJobID()).thenReturn(jobID); + when(reduceId.toString()).thenReturn(TEST_TASK_ID); + + JobConf jobConf = mock(JobConf.class); + when(jobConf.getUser()).thenReturn(TEST_USER_NAME); + when(jobConf.getJobName()).thenReturn(TEST_JOB_NAME); + + // Act + ShuffleClientMetrics shuffleClientMetrics = + ShuffleClientMetrics.create(reduceId, jobConf); + + // Assert + MetricsTag userMetrics = shuffleClientMetrics.getMetricsRegistry() + .getTag("user"); + assertEquals(TEST_USER_NAME, userMetrics.value()); + + MetricsTag jobNameMetrics = shuffleClientMetrics.getMetricsRegistry() + .getTag("jobName"); + assertEquals(TEST_JOB_NAME, jobNameMetrics.value()); + + MetricsTag jobIdMetrics = shuffleClientMetrics.getMetricsRegistry() + .getTag("jobId"); + assertEquals(TEST_JOB_ID, jobIdMetrics.value()); + + MetricsTag taskIdMetrics = shuffleClientMetrics.getMetricsRegistry() + .getTag("taskId"); + assertEquals(TEST_TASK_ID, taskIdMetrics.value()); + } +} \ No newline at end of file