diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index 3bf05420f0..ed49f82506 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -25,9 +25,11 @@ import java.net.Proxy; import java.net.URL; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapreduce.CustomJobEndNotifier; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.eclipse.jetty.util.log.Log; @@ -57,6 +59,9 @@ public class JobEndNotifier implements Configurable { protected int timeout; // Timeout (ms) on the connection and notification protected URL urlToNotify; //URL to notify read from the config protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification + // A custom notifier implementation + // (see org.apache.hadoop.mapreduce.CustomJobEndNotifier) + private String customJobEndNotifierClassName; /** * Parse the URL that needs to be notified of the end of the job, along @@ -84,6 +89,9 @@ public void setConf(Configuration conf) { proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY); + customJobEndNotifierClassName = StringUtils.stripToNull( + conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS)); + //Configure the proxy to use if its set. It should be set like //proxyType@proxyHostname:port if(proxyConf != null && !proxyConf.equals("") && @@ -115,11 +123,22 @@ public void setConf(Configuration conf) { public Configuration getConf() { return conf; } - + /** * Notify the URL just once. Use best effort. */ protected boolean notifyURLOnce() { + if (customJobEndNotifierClassName == null) { + return notifyViaBuiltInNotifier(); + } else { + return notifyViaCustomNotifier(); + } + } + + /** + * Uses a simple HttpURLConnection to do the Job end notification. + */ + private boolean notifyViaBuiltInNotifier() { boolean success = false; try { Log.getLog().info("Job end notification trying " + urlToNotify); @@ -145,6 +164,36 @@ protected boolean notifyURLOnce() { return success; } + /** + * Uses the custom Job end notifier class to do the Job end notification. + */ + private boolean notifyViaCustomNotifier() { + try { + Log.getLog().info("Will be using " + customJobEndNotifierClassName + + " for Job end notification"); + + final Class customJobEndNotifierClass = + Class.forName(customJobEndNotifierClassName) + .asSubclass(CustomJobEndNotifier.class); + final CustomJobEndNotifier customJobEndNotifier = + customJobEndNotifierClass.getDeclaredConstructor().newInstance(); + + boolean success = customJobEndNotifier.notifyOnce(urlToNotify, conf); + if (success) { + Log.getLog().info("Job end notification to " + urlToNotify + + " succeeded"); + } else { + Log.getLog().warn("Job end notification to " + urlToNotify + + " failed"); + } + return success; + } catch (Exception e) { + Log.getLog().warn("Job end notification to " + urlToNotify + + " failed", e); + return false; + } + } + /** * Notify a server of the completion of a submitted job. The user must have * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index 5af79d6f73..1cd625551a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -31,6 +31,7 @@ import java.net.Proxy; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; import java.nio.channels.ClosedChannelException; import javax.servlet.ServletException; @@ -42,7 +43,9 @@ import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapreduce.CustomJobEndNotifier; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -299,6 +302,45 @@ public void testNotificationOnLastRetryUnregistrationFailure() server.stop(); } + @Test + public void testCustomNotifierClass() throws InterruptedException { + JobConf conf = new JobConf(); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, + "http://example.com?jobId=$jobId&jobStatus=$jobStatus"); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS, + CustomNotifier.class.getName()); + this.setConf(conf); + + JobReport jobReport = mock(JobReport.class); + JobId jobId = mock(JobId.class); + when(jobId.toString()).thenReturn("mock-Id"); + when(jobReport.getJobId()).thenReturn(jobId); + when(jobReport.getJobState()).thenReturn(JobState.SUCCEEDED); + + CustomNotifier.urlToNotify = null; + this.notify(jobReport); + final URL urlToNotify = CustomNotifier.urlToNotify; + + Assert.assertEquals("http://example.com?jobId=mock-Id&jobStatus=SUCCEEDED", + urlToNotify.toString()); + } + + public static final class CustomNotifier implements CustomJobEndNotifier { + + /** + * Once notifyOnce was invoked we'll store the URL in this variable + * so we can assert on it. + */ + private static URL urlToNotify = null; + + @Override + public boolean notifyOnce(final URL url, final Configuration jobConf) { + urlToNotify = url; + return true; + } + + } + private static HttpServer2 startHttpServer() throws Exception { new File(System.getProperty( "build.webapps", "build/webapps") + "/test").mkdirs(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java index c0dd6502f0..2334364abe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java @@ -1886,6 +1886,52 @@ public void setJobEndNotificationURI(String uri) { set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); } + /** + * Returns the class to be invoked in order to send a notification + * after the job has completed (success/failure). + * + * @return the fully-qualified name of the class which implements + * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} set through the + * {@link org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} + * property + * + * @see JobConf#setJobEndNotificationCustomNotifierClass(java.lang.String) + * @see org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS + */ + public String getJobEndNotificationCustomNotifierClass() { + return get(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS); + } + + /** + * Sets the class to be invoked in order to send a notification after the job + * has completed (success/failure). + * + * A notification url still has to be set which will be passed to + * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier#notifyOnce( + * java.net.URL, org.apache.hadoop.conf.Configuration)} + * along with the Job's conf. + * + * If this is set instead of using a simple HttpURLConnection + * we'll create a new instance of this class + * which should be an implementation of + * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier}, + * and we'll invoke that. + * + * @param customNotifierClassName the fully-qualified name of the class + * which implements + * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} + * + * @see JobConf#setJobEndNotificationURI(java.lang.String) + * @see + * org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS + */ + public void setJobEndNotificationCustomNotifierClass( + String customNotifierClassName) { + + set(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS, + customNotifierClassName); + } + /** * Get job-specific shared directory for use as scratch space * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java new file mode 100644 index 0000000000..e5a6b9f1df --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import org.apache.hadoop.conf.Configuration; + +import java.net.URL; + +/** + * An interface for implementing a custom Job end notifier. The built-in + * Job end notifier uses a simple HTTP connection to notify the Job end status. + * By implementing this interface and setting the + * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} property + * in the map-reduce Job configuration you can have your own + * notification mechanism. For now this still only works with HTTP/HTTPS URLs, + * but by implementing this class you can choose how you want to make the + * notification itself. For example you can choose to use a custom + * HTTP library, or do a delegation token authentication, maybe set a + * custom SSL context on the connection, etc. This means you still have to set + * the {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL} property + * in the Job's conf. + */ +public interface CustomJobEndNotifier { + + /** + * The implementation should try to do a Job end notification only once. + * + * See {@link MRJobConfig#MR_JOB_END_RETRY_ATTEMPTS}, + * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS} + * and org.apache.hadoop.mapreduce.v2.app.JobEndNotifier on how exactly + * this method will be invoked. + * + * @param url the URL which needs to be notified + * (see {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL}) + * @param jobConf the map-reduce Job's configuration + * + * @return true if the notification was successful + */ + boolean notifyOnce(URL url, Configuration jobConf) throws Exception; + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 0e31e170b8..a90c58dd28 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -1105,6 +1105,9 @@ public interface MRJobConfig { public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL = "mapreduce.job.end-notification.max.retry.interval"; + String MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS = + "mapreduce.job.end-notification.custom-notifier-class"; + public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT = 5000; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index c40bb0b19c..844c91c5ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1415,6 +1415,23 @@ + + mapreduce.job.end-notification.custom-notifier-class + A class to be invoked in order to send a notification after the + job has completed (success/failure). The class must implement + org.apache.hadoop.mapreduce.CustomJobEndNotifier. A notification + url still has to be set which will be passed to the notifyOnce + method of your implementation along with the Job's configuration. + If this is set instead of using a simple HttpURLConnection we'll + create a new instance of this class. For now this still only works + with HTTP/HTTPS URLs, but by implementing this class you can choose + how you want to make the notification itself. For example you can + choose to use a custom HTTP library, or do a delegation token + authentication, maybe set a custom SSL context on the connection, etc. + The class needs to have a no-arg constructor. + + + mapreduce.job.log4j-properties-file