From af61f4ae15adf3bf6c863945f8c8e3ea7b12320c Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 21 Sep 2011 09:13:34 +0000 Subject: [PATCH] MAPREDUCE-2970. Fixed NPEs in corner cases with different configurations for mapreduce.framework.name. Contributed by Venu Gopala Rao. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173534 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapreduce/Cluster.java | 37 ++++--- .../TestYarnClientProtocolProvider.java | 59 +++++++++++ .../JobTrackerClientProtocolProvider.java | 14 ++- .../mapred/LocalClientProtocolProvider.java | 9 +- .../TestClientProtocolProviderImpls.java | 99 +++++++++++++++++++ 6 files changed, 201 insertions(+), 20 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java create mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7f50b75fc3..36a5f24c03 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1384,6 +1384,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3023. Fixed clients to display queue state correctly. (Ravi Prakash via acmurthy) + MAPREDUCE-2970. Fixed NPEs in corner cases with different configurations + for mapreduce.framework.name. (Venu Gopala Rao via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index 5112c86e7b..33d5f81b4f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -41,8 +41,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; /** * Provides a way to access information about the map/reduce cluster. @@ -68,30 +68,41 @@ public static enum JobTrackerStatus {INITIALIZING, RUNNING}; } public Cluster(Configuration conf) throws IOException { - this.conf = conf; - this.ugi = UserGroupInformation.getCurrentUser(); - for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) { - ClientProtocol clientProtocol = provider.create(conf); - if (clientProtocol != null) { - clientProtocolProvider = provider; - client = clientProtocol; - break; - } - } + this(null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); - for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) { - ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf); + initialize(jobTrackAddr, conf); + } + + private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) + throws IOException { + + for (ClientProtocolProvider provider : ServiceLoader + .load(ClientProtocolProvider.class)) { + ClientProtocol clientProtocol = null; + if (jobTrackAddr == null) { + clientProtocol = provider.create(conf); + } else { + clientProtocol = provider.create(jobTrackAddr, conf); + } + if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; break; } } + + if (null == clientProtocolProvider || null == client) { + throw new IOException( + "Cannot initialize Cluster. Please check your configuration for " + + MRConfig.FRAMEWORK_NAME + + " and the correspond server addresses."); + } } ClientProtocol getClient() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java new file mode 100644 index 0000000000..2bc9030bf8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.junit.Test; + +public class TestYarnClientProtocolProvider extends TestCase { + + @Test + public void testClusterWithYarnClientProtocolProvider() throws Exception { + + Configuration conf = new Configuration(false); + Cluster cluster = null; + + try { + cluster = new Cluster(conf); + fail("Cluster should not be initialized with out any framework name"); + } catch (IOException e) { + + } + + try { + conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + cluster = new Cluster(conf); + ClientProtocol client = cluster.getClient(); + assertTrue(client instanceof YARNRunner); + } catch (IOException e) { + + } finally { + if (cluster != null) { + cluster.close(); + } + } + } +} diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java index 42c958d77c..d12132c68d 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java @@ -43,20 +43,24 @@ public ClientProtocol create(Configuration conf) throws IOException { String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local"); if (!"local".equals(tracker)) { return createRPCProxy(JobTracker.getAddress(conf), conf); + } else { + throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS + + "\" configuration value for JobTracker: \"" + + tracker + "\""); } - return null; } @Override - public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { + public ClientProtocol create(InetSocketAddress addr, Configuration conf) + throws IOException { return createRPCProxy(addr, conf); } - + private ClientProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (ClientProtocol) RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), - conf, NetUtils.getSocketFactory(conf, ClientProtocol.class)); + ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), + conf, NetUtils.getSocketFactory(conf, ClientProtocol.class)); } @Override diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java index 68d10bc4d0..d09b222ee9 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java @@ -37,11 +37,16 @@ public ClientProtocol create(Configuration conf) throws IOException { if (framework != null && !framework.equals("local")) { return null; } - if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) { + String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local"); + if ("local".equals(tracker)) { conf.setInt("mapreduce.job.maps", 1); return new LocalJobRunner(conf); + } else { + + throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS + + "\" configuration value for LocalJobRunner : \"" + + tracker + "\""); } - return null; } @Override diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java new file mode 100644 index 0000000000..a9044e2430 --- /dev/null +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.LocalJobRunner; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.junit.Test; + +public class TestClientProtocolProviderImpls extends TestCase { + + @Test + public void testClusterWithLocalClientProvider() throws Exception { + + Configuration conf = new Configuration(); + + try { + conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); + new Cluster(conf); + fail("Cluster should not be initialized with incorrect framework name"); + } catch (IOException e) { + + } + + try { + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); + + new Cluster(conf); + fail("Cluster with Local Framework name should use local JT address"); + } catch (IOException e) { + + } + + try { + conf.set(JTConfig.JT_IPC_ADDRESS, "local"); + Cluster cluster = new Cluster(conf); + assertTrue(cluster.getClient() instanceof LocalJobRunner); + cluster.close(); + } catch (IOException e) { + + } + } + + @Test + public void testClusterWithJTClientProvider() throws Exception { + + Configuration conf = new Configuration(); + try { + conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); + new Cluster(conf); + fail("Cluster should not be initialized with incorrect framework name"); + + } catch (IOException e) { + + } + + try { + conf.set(MRConfig.FRAMEWORK_NAME, "classic"); + conf.set(JTConfig.JT_IPC_ADDRESS, "local"); + new Cluster(conf); + fail("Cluster with classic Framework name shouldnot use local JT address"); + + } catch (IOException e) { + + } + + try { + conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, "classic"); + conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); + Cluster cluster = new Cluster(conf); + cluster.close(); + } catch (IOException e) { + + } + } + +}