MAPREDUCE-2970. Fixed NPEs in corner cases with different configurations for mapreduce.framework.name. Contributed by Venu Gopala Rao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173534 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-21 09:13:34 +00:00
parent 01fbb0fb45
commit af61f4ae15
6 changed files with 201 additions and 20 deletions

View File

@ -1384,6 +1384,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3023. Fixed clients to display queue state correctly. (Ravi
Prakash via acmurthy)
MAPREDUCE-2970. Fixed NPEs in corner cases with different configurations
for mapreduce.framework.name. (Venu Gopala Rao via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -41,8 +41,8 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
/**
* Provides a way to access information about the map/reduce cluster.
@ -68,30 +68,41 @@ public static enum JobTrackerStatus {INITIALIZING, RUNNING};
}
public Cluster(Configuration conf) throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
ClientProtocol clientProtocol = provider.create(conf);
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
break;
}
}
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
initialize(jobTrackAddr, conf);
}
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
for (ClientProtocolProvider provider : ServiceLoader
.load(ClientProtocolProvider.class)) {
ClientProtocol clientProtocol = null;
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
break;
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
ClientProtocol getClient() {

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.junit.Test;
public class TestYarnClientProtocolProvider extends TestCase {
@Test
public void testClusterWithYarnClientProtocolProvider() throws Exception {
Configuration conf = new Configuration(false);
Cluster cluster = null;
try {
cluster = new Cluster(conf);
fail("Cluster should not be initialized with out any framework name");
} catch (IOException e) {
}
try {
conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
cluster = new Cluster(conf);
ClientProtocol client = cluster.getClient();
assertTrue(client instanceof YARNRunner);
} catch (IOException e) {
} finally {
if (cluster != null) {
cluster.close();
}
}
}
}

View File

@ -43,20 +43,24 @@ public ClientProtocol create(Configuration conf) throws IOException {
String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
if (!"local".equals(tracker)) {
return createRPCProxy(JobTracker.getAddress(conf), conf);
} else {
throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
+ "\" configuration value for JobTracker: \""
+ tracker + "\"");
}
return null;
}
@Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
public ClientProtocol create(InetSocketAddress addr, Configuration conf)
throws IOException {
return createRPCProxy(addr, conf);
}
private ClientProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
}
@Override

View File

@ -37,11 +37,16 @@ public ClientProtocol create(Configuration conf) throws IOException {
if (framework != null && !framework.equals("local")) {
return null;
}
if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
if ("local".equals(tracker)) {
conf.setInt("mapreduce.job.maps", 1);
return new LocalJobRunner(conf);
} else {
throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
+ "\" configuration value for LocalJobRunner : \""
+ tracker + "\"");
}
return null;
}
@Override

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.Test;
public class TestClientProtocolProviderImpls extends TestCase {
@Test
public void testClusterWithLocalClientProvider() throws Exception {
Configuration conf = new Configuration();
try {
conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
new Cluster(conf);
fail("Cluster should not be initialized with incorrect framework name");
} catch (IOException e) {
}
try {
conf.set(MRConfig.FRAMEWORK_NAME, "local");
conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
new Cluster(conf);
fail("Cluster with Local Framework name should use local JT address");
} catch (IOException e) {
}
try {
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
Cluster cluster = new Cluster(conf);
assertTrue(cluster.getClient() instanceof LocalJobRunner);
cluster.close();
} catch (IOException e) {
}
}
@Test
public void testClusterWithJTClientProvider() throws Exception {
Configuration conf = new Configuration();
try {
conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
new Cluster(conf);
fail("Cluster should not be initialized with incorrect framework name");
} catch (IOException e) {
}
try {
conf.set(MRConfig.FRAMEWORK_NAME, "classic");
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
new Cluster(conf);
fail("Cluster with classic Framework name shouldnot use local JT address");
} catch (IOException e) {
}
try {
conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, "classic");
conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
Cluster cluster = new Cluster(conf);
cluster.close();
} catch (IOException e) {
}
}
}