diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt
index 28d4619129..ee6fba9e05 100644
--- a/mapreduce/CHANGES.txt
+++ b/mapreduce/CHANGES.txt
@@ -37,6 +37,9 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a
+ ServiceProvider for the actual implementation. (tomwhite via acmurthy)
+
MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk)
MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to
diff --git a/mapreduce/build.xml b/mapreduce/build.xml
index 8474e795a5..c2ad71ffdd 100644
--- a/mapreduce/build.xml
+++ b/mapreduce/build.xml
@@ -416,6 +416,7 @@
+
diff --git a/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000000..1a54e30048
--- /dev/null
+++ b/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,15 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.hadoop.mapred.JobTrackerClientProtocolProvider
+org.apache.hadoop.mapred.LocalClientProtocolProvider
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java
new file mode 100644
index 0000000000..42c958d77c
--- /dev/null
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+@InterfaceAudience.Private
+public class JobTrackerClientProtocolProvider extends ClientProtocolProvider {
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ String framework = conf.get(MRConfig.FRAMEWORK_NAME);
+ if (framework != null && !framework.equals("classic")) {
+ return null;
+ }
+ String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
+ if (!"local".equals(tracker)) {
+ return createRPCProxy(JobTracker.getAddress(conf), conf);
+ }
+ return null;
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
+ return createRPCProxy(addr, conf);
+ }
+
+ private ClientProtocol createRPCProxy(InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
+ conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ RPC.stopProxy(clientProtocol);
+ }
+
+}
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java b/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
new file mode 100644
index 0000000000..68d10bc4d0
--- /dev/null
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+@InterfaceAudience.Private
+public class LocalClientProtocolProvider extends ClientProtocolProvider {
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ String framework = conf.get(MRConfig.FRAMEWORK_NAME);
+ if (framework != null && !framework.equals("local")) {
+ return null;
+ }
+ if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
+ conf.setInt("mapreduce.job.maps", 1);
+ return new LocalJobRunner(conf);
+ }
+ return null;
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
+ return null; // LocalJobRunner doesn't use a socket
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) {
+ // no clean up required
+ }
+
+}
diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java
index 1f4067b9e6..fcc0ee1c63 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java
@@ -23,6 +23,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
+import java.util.ServiceLoader;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -30,14 +31,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobTracker;
-import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.net.NetUtils;
@@ -56,6 +56,7 @@ public class Cluster {
@InterfaceStability.Evolving
public static enum JobTrackerStatus {INITIALIZING, RUNNING};
+ private ClientProtocolProvider clientProtocolProvider;
private ClientProtocol client;
private UserGroupInformation ugi;
private Configuration conf;
@@ -71,35 +72,30 @@ public static enum JobTrackerStatus {INITIALIZING, RUNNING};
public Cluster(Configuration conf) throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
- client = createClient(conf);
+ for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
+ ClientProtocol clientProtocol = provider.create(conf);
+ if (clientProtocol != null) {
+ clientProtocolProvider = provider;
+ client = clientProtocol;
+ break;
+ }
+ }
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
- client = createRPCProxy(jobTrackAddr, conf);
- }
-
- private ClientProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, addr, ugi, conf,
- NetUtils.getSocketFactory(conf, ClientProtocol.class));
- }
-
- private ClientProtocol createClient(Configuration conf) throws IOException {
- ClientProtocol client;
- String tracker = conf.get("mapreduce.jobtracker.address", "local");
- if ("local".equals(tracker)) {
- conf.setInt("mapreduce.job.maps", 1);
- client = new LocalJobRunner(conf);
- } else {
- client = createRPCProxy(JobTracker.getAddress(conf), conf);
+ for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
+ ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
+ if (clientProtocol != null) {
+ clientProtocolProvider = provider;
+ client = clientProtocol;
+ break;
+ }
}
- return client;
}
-
+
ClientProtocol getClient() {
return client;
}
@@ -112,9 +108,7 @@ Configuration getConf() {
* Close the Cluster
.
*/
public synchronized void close() throws IOException {
- if (!(client instanceof LocalJobRunner)) {
- RPC.stopProxy(client);
- }
+ clientProtocolProvider.close(client);
}
private Job[] getJobs(JobStatus[] stats) throws IOException {
@@ -353,7 +347,8 @@ public long getTaskTrackerExpiryInterval() throws IOException,
getDelegationToken(Text renewer) throws IOException, InterruptedException{
Token result =
client.getDelegationToken(renewer);
- InetSocketAddress addr = JobTracker.getAddress(conf);
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ conf.get(JTConfig.JT_IPC_ADDRESS, "localhost:8012"));
StringBuilder service = new StringBuilder();
service.append(NetUtils.normalizeHostName(addr.getAddress().
getHostAddress()));
diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java
index 39df8b8e05..9e88ea182e 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java
@@ -57,4 +57,6 @@ public interface MRConfig {
"mapreduce.cluster.delegation.token.max-lifetime";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
+
+ public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
}
diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java
new file mode 100644
index 0000000000..a9b4233448
--- /dev/null
+++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.protocol;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+@InterfaceAudience.Private
+public abstract class ClientProtocolProvider {
+
+ public abstract ClientProtocol create(Configuration conf) throws IOException;
+
+ public abstract ClientProtocol create(InetSocketAddress addr,
+ Configuration conf) throws IOException;
+
+ public abstract void close(ClientProtocol clientProtocol) throws IOException;
+
+}