From aa297b188e877af71dbcef9859a398c3eeda920a Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 3 Nov 2011 12:25:18 +0000 Subject: [PATCH] MAPREDUCE-3221. Reneabled the previously ignored test in TestSubmitJob and fixed bugs in it. Contributed by Devaraj K. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1197080 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapred/TestSubmitJob.java | 148 +++++++----------- 2 files changed, 56 insertions(+), 95 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5cb5fec264..f6040d3a80 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -66,6 +66,9 @@ Release 0.23.1 - Unreleased BUG FIXES + MAPREDUCE-3221. Reneabled the previously ignored test in TestSubmitJob + and fixed bugs in it. (Devaraj K via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java index c24ff38d5b..c32ead05b8 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java @@ -17,44 +17,33 @@ */ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import java.io.DataOutputStream; import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.mapred.lib.IdentityMapper; -import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.SleepJob; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; - -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; /** @@ -68,42 +57,10 @@ import org.junit.Test; public class TestSubmitJob { static final Log LOG = LogFactory.getLog(TestSubmitJob.class); - private MiniMRCluster mrCluster; - - private MiniDFSCluster dfsCluster; - private JobTracker jt; - private FileSystem fs; private static Path TEST_DIR = new Path(System.getProperty("test.build.data","/tmp"), "job-submission-testing"); - private static int numSlaves = 1; - @Before - public void startCluster() throws Exception { - Configuration conf = new Configuration(); - dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); - JobConf jConf = new JobConf(conf); - jConf.setLong("mapred.job.submission.expiry.interval", 6 * 1000); - mrCluster = new MiniMRCluster(0, 0, numSlaves, - dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, - jConf); - jt = mrCluster.getJobTrackerRunner().getJobTracker(); - fs = FileSystem.get(mrCluster.createJobConf()); - } - - @After - public void stopCluster() throws Exception { - if (mrCluster != null) { - mrCluster.shutdown(); - mrCluster = null; - } - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } - jt = null; - fs = null; - } /** * Test to verify that jobs with invalid memory requirements are killed at the @@ -111,51 +68,53 @@ public class TestSubmitJob { * * @throws Exception */ + @SuppressWarnings("deprecation") @Test - public void testJobWithInvalidMemoryReqs() - throws Exception { - JobConf jtConf = new JobConf(); - jtConf - .setLong(MRConfig.MAPMEMORY_MB, 1 * 1024L); - jtConf.setLong(MRConfig.REDUCEMEMORY_MB, - 2 * 1024L); - jtConf.setLong(JTConfig.JT_MAX_MAPMEMORY_MB, - 3 * 1024L); - jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, - 4 * 1024L); + public void testJobWithInvalidMemoryReqs() throws Exception { + MiniMRCluster mrCluster = null; + try { + JobConf jtConf = new JobConf(); + jtConf.setLong(MRConfig.MAPMEMORY_MB, 1 * 1024L); + jtConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024L); + jtConf.setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 3 * 1024L); + jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 4 * 1024L); - mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf); + mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf); - JobConf clusterConf = mrCluster.createJobConf(); + JobConf clusterConf = mrCluster.createJobConf(); - // No map-memory configuration - JobConf jobConf = new JobConf(clusterConf); - jobConf.setMemoryForReduceTask(1 * 1024L); - runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L, - "Invalid job requirements."); + // No map-memory configuration + JobConf jobConf = new JobConf(clusterConf); + jobConf.setMemoryForReduceTask(1 * 1024L); + runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L, + "Invalid job requirements."); - // No reduce-memory configuration - jobConf = new JobConf(clusterConf); - jobConf.setMemoryForMapTask(1 * 1024L); - runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT, - "Invalid job requirements."); + // No reduce-memory configuration + jobConf = new JobConf(clusterConf); + jobConf.setMemoryForMapTask(1 * 1024L); + runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT, + "Invalid job requirements."); - // Invalid map-memory configuration - jobConf = new JobConf(clusterConf); - jobConf.setMemoryForMapTask(4 * 1024L); - jobConf.setMemoryForReduceTask(1 * 1024L); - runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L, - "Exceeds the cluster's max-memory-limit."); + // Invalid map-memory configuration + jobConf = new JobConf(clusterConf); + jobConf.setMemoryForMapTask(4 * 1024L); + jobConf.setMemoryForReduceTask(1 * 1024L); + runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L, + "Exceeds the cluster's max-memory-limit."); - // No reduce-memory configuration - jobConf = new JobConf(clusterConf); - jobConf.setMemoryForMapTask(1 * 1024L); - jobConf.setMemoryForReduceTask(5 * 1024L); - runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L, - "Exceeds the cluster's max-memory-limit."); - + // No reduce-memory configuration + jobConf = new JobConf(clusterConf); + jobConf.setMemoryForMapTask(1 * 1024L); + jobConf.setMemoryForReduceTask(5 * 1024L); + runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L, + "Exceeds the cluster's max-memory-limit."); + } finally { + if (mrCluster != null) + mrCluster.shutdown(); + } } + @SuppressWarnings("deprecation") private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks, long memForReduceTasks, String expectedMsg) throws Exception, @@ -180,6 +139,7 @@ public class TestSubmitJob { .contains(overallExpectedMsg)); } + @SuppressWarnings("deprecation") static ClientProtocol getJobSubmitClient(JobConf conf, UserGroupInformation ugi) throws IOException { @@ -188,24 +148,23 @@ public class TestSubmitJob { conf, NetUtils.getSocketFactory(conf, ClientProtocol.class)); } - static org.apache.hadoop.hdfs.protocol.ClientProtocol getDFSClient( + static ClientNamenodeWireProtocol getDFSClient( Configuration conf, UserGroupInformation ugi) throws IOException { - return (org.apache.hadoop.hdfs.protocol.ClientProtocol) - RPC.getProxy(org.apache.hadoop.hdfs.protocol.ClientProtocol.class, - org.apache.hadoop.hdfs.protocol.ClientProtocol.versionID, + return (ClientNamenodeWireProtocol) + RPC.getProxy(ClientNamenodeWireProtocol.class, + ClientNamenodeWireProtocol.versionID, NameNode.getAddress(conf), ugi, conf, NetUtils.getSocketFactory(conf, - org.apache.hadoop.hdfs.protocol.ClientProtocol.class)); + ClientNamenodeWireProtocol.class)); } /** * Submit a job and check if the files are accessible to other users. - * TODO fix testcase */ + @SuppressWarnings("deprecation") @Test - @Ignore public void testSecureJobExecution() throws Exception { LOG.info("Testing secure job submission/execution"); MiniMRCluster mr = null; @@ -227,7 +186,6 @@ public class TestSubmitJob { mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(), 1, null, null, MR_UGI); JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); - String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); // cleanup dfs.getFileSystem().delete(TEST_DIR, true); @@ -268,7 +226,7 @@ public class TestSubmitJob { UserGroupInformation user2 = TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false); JobConf conf_other = mr.createJobConf(); - org.apache.hadoop.hdfs.protocol.ClientProtocol client = + ClientNamenodeWireProtocol client = getDFSClient(conf_other, user2); // try accessing mapred.system.dir/jobid/*