diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 473ea7dd57..14e6147317 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -233,6 +233,9 @@ Release 2.0.0 - UNRELEASED MAPREDUCE-4108. Fix tests in org.apache.hadoop.util.TestRunJar (Devaraj K via tgraves) + MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory + (Devaraj K via tgraves) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java index 87ab4e0cfd..48e76f42ed 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java @@ -22,42 +22,96 @@ import java.net.Socket; import java.net.SocketAddress; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.net.StandardSocketFactory; -import org.junit.Ignore; +import org.junit.Assert; +import org.junit.Test; /** * This class checks that RPCs can use specialized socket factories. */ -@Ignore -public class TestSocketFactory extends TestCase { +public class TestSocketFactory { /** - * Check that we can reach a NameNode or a JobTracker using a specific + * Check that we can reach a NameNode or Resource Manager using a specific * socket factory */ + @Test public void testSocketFactory() throws IOException { // Create a standard mini-cluster Configuration sconf = new Configuration(); - MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(sconf).numDataNodes(1) + .build(); final int nameNodePort = cluster.getNameNodePort(); // Get a reference to its DFS directly FileSystem fs = cluster.getFileSystem(); - assertTrue(fs instanceof DistributedFileSystem); + Assert.assertTrue(fs instanceof DistributedFileSystem); DistributedFileSystem directDfs = (DistributedFileSystem) fs; + Configuration cconf = getCustomSocketConfigs(nameNodePort); + + fs = FileSystem.get(cconf); + Assert.assertTrue(fs instanceof DistributedFileSystem); + DistributedFileSystem dfs = (DistributedFileSystem) fs; + + JobClient client = null; + MiniMRYarnCluster miniMRYarnCluster = null; + try { + // This will test RPC to the NameNode only. + // could we test Client-DataNode connections? + Path filePath = new Path("/dir"); + + Assert.assertFalse(directDfs.exists(filePath)); + Assert.assertFalse(dfs.exists(filePath)); + + directDfs.mkdirs(filePath); + Assert.assertTrue(directDfs.exists(filePath)); + Assert.assertTrue(dfs.exists(filePath)); + + // This will test RPC to a Resource Manager + fs = FileSystem.get(sconf); + JobConf jobConf = new JobConf(); + FileSystem.setDefaultUri(jobConf, fs.getUri().toString()); + miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf); + JobConf jconf = new JobConf(cconf); + jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + String rmAddress = jconf.get("yarn.resourcemanager.address"); + String[] split = rmAddress.split(":"); + jconf.set("yarn.resourcemanager.address", split[0] + ':' + + (Integer.parseInt(split[1]) + 10)); + client = new JobClient(jconf); + + JobStatus[] jobs = client.jobsToComplete(); + Assert.assertTrue(jobs.length == 0); + + } finally { + closeClient(client); + closeDfs(dfs); + closeDfs(directDfs); + stopMiniMRYarnCluster(miniMRYarnCluster); + shutdownDFSCluster(cluster); + } + } + + private MiniMRYarnCluster initAndStartMiniMRYarnCluster(JobConf jobConf) { + MiniMRYarnCluster miniMRYarnCluster; + miniMRYarnCluster = new MiniMRYarnCluster(this.getClass().getName(), 1); + miniMRYarnCluster.init(jobConf); + miniMRYarnCluster.start(); + return miniMRYarnCluster; + } + + private Configuration getCustomSocketConfigs(final int nameNodePort) { // Get another reference via network using a specific socket factory Configuration cconf = new Configuration(); FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/", @@ -68,78 +122,49 @@ public void testSocketFactory() throws IOException { "org.apache.hadoop.ipc.DummySocketFactory"); cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol", "org.apache.hadoop.ipc.DummySocketFactory"); + return cconf; + } - fs = FileSystem.get(cconf); - assertTrue(fs instanceof DistributedFileSystem); - DistributedFileSystem dfs = (DistributedFileSystem) fs; - - JobClient client = null; - MiniMRCluster mr = null; + private void shutdownDFSCluster(MiniDFSCluster cluster) { try { - // This will test RPC to the NameNode only. - // could we test Client-DataNode connections? - Path filePath = new Path("/dir"); + if (cluster != null) + cluster.shutdown(); - assertFalse(directDfs.exists(filePath)); - assertFalse(dfs.exists(filePath)); + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } - directDfs.mkdirs(filePath); - assertTrue(directDfs.exists(filePath)); - assertTrue(dfs.exists(filePath)); + private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) { + try { + if (miniMRYarnCluster != null) + miniMRYarnCluster.stop(); - // This will test TPC to a JobTracker - fs = FileSystem.get(sconf); - mr = new MiniMRCluster(1, fs.getUri().toString(), 1); - final int jobTrackerPort = mr.getJobTrackerPort(); + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } - JobConf jconf = new JobConf(cconf); - jconf.set("mapred.job.tracker", String.format("localhost:%d", - jobTrackerPort + 10)); - jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); - client = new JobClient(jconf); + private void closeDfs(DistributedFileSystem dfs) { + try { + if (dfs != null) + dfs.close(); - JobStatus[] jobs = client.jobsToComplete(); - assertTrue(jobs.length == 0); + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } - } finally { - try { - if (client != null) - client.close(); - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - try { - if (dfs != null) - dfs.close(); - - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - try { - if (directDfs != null) - directDfs.close(); - - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - try { - if (cluster != null) - cluster.shutdown(); - - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - if (mr != null) { - try { - mr.shutdown(); - } catch (Exception ignored) { - ignored.printStackTrace(); - } - } + private void closeClient(JobClient client) { + try { + if (client != null) + client.close(); + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); } } } @@ -155,32 +180,27 @@ class DummySocketFactory extends StandardSocketFactory { public DummySocketFactory() { } - /* @inheritDoc */ @Override public Socket createSocket() throws IOException { return new Socket() { @Override - public void connect(SocketAddress addr, int timeout) - throws IOException { + public void connect(SocketAddress addr, int timeout) throws IOException { assert (addr instanceof InetSocketAddress); InetSocketAddress iaddr = (InetSocketAddress) addr; SocketAddress newAddr = null; if (iaddr.isUnresolved()) - newAddr = - new InetSocketAddress(iaddr.getHostName(), - iaddr.getPort() - 10); + newAddr = new InetSocketAddress(iaddr.getHostName(), + iaddr.getPort() - 10); else - newAddr = - new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10); - System.out.printf("Test socket: rerouting %s to %s\n", iaddr, - newAddr); + newAddr = new InetSocketAddress(iaddr.getAddress(), + iaddr.getPort() - 10); + System.out.printf("Test socket: rerouting %s to %s\n", iaddr, newAddr); super.connect(newAddr, timeout); } }; } - /* @inheritDoc */ @Override public boolean equals(Object obj) { if (this == obj) @@ -191,11 +211,4 @@ public boolean equals(Object obj) { return false; return true; } - - /* @inheritDoc */ - @Override - public int hashCode() { - // Dummy hash code (to make find bugs happy) - return 53; - } }