From 355ba013747637e71936eab499055446616ed9d3 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Sat, 21 Jan 2012 01:15:24 +0000 Subject: [PATCH] MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1234227 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + hadoop-mapreduce-project/build.xml | 2 - hadoop-mapreduce-project/ivy.xml | 2 + .../src/contrib/gridmix/ivy.xml | 2 + .../tools/rumen/TestConcurrentRead.java | 136 -- .../hadoop/tools/rumen/TestParsedLine.java | 105 - .../tools/rumen/TestRumenAnonymization.java | 1940 ----------------- .../hadoop/tools/rumen/TestRumenFolder.java | 196 -- .../tools/rumen/TestRumenJobTraces.java | 1259 ----------- .../hadoop/tools/rumen/TestZombieJob.java | 338 --- 10 files changed, 7 insertions(+), 3976 deletions(-) delete mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java delete mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java delete mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenAnonymization.java delete mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java delete mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java delete mode 100644 hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dc6220e80a..a7a73340da 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -526,6 +526,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3549. write api documentation for web service apis for RM, NM, mapreduce app master, and job history server (Thomas Graves via mahadev) + MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via + mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/build.xml b/hadoop-mapreduce-project/build.xml index 9df60e98bb..2eb5459be4 100644 --- a/hadoop-mapreduce-project/build.xml +++ b/hadoop-mapreduce-project/build.xml @@ -575,8 +575,6 @@ - - diff --git a/hadoop-mapreduce-project/ivy.xml b/hadoop-mapreduce-project/ivy.xml index e04da7019b..95042252e9 100644 --- a/hadoop-mapreduce-project/ivy.xml +++ b/hadoop-mapreduce-project/ivy.xml @@ -99,6 +99,8 @@ rev="${yarn.version}" conf="compile->default"> + diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/ivy.xml b/hadoop-mapreduce-project/src/contrib/gridmix/ivy.xml index 4ab7b62065..d587a7b875 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/ivy.xml +++ b/hadoop-mapreduce-project/src/contrib/gridmix/ivy.xml @@ -70,6 +70,8 @@ + cachedTrace = new ArrayList(); - static final String traceFile = - "rumen/small-trace-test/job-tracker-logs-trace-output.gz"; - - static Configuration conf; - static FileSystem lfs; - static Path path; - - @BeforeClass - static public void globalSetUp() throws IOException { - conf = new Configuration(); - lfs = FileSystem.getLocal(conf); - Path rootInputDir = new Path(System.getProperty("test.tools.input.dir", "")) - .makeQualified(lfs.getUri(), lfs.getWorkingDirectory()); - path = new Path(rootInputDir, traceFile); - JobTraceReader reader = new JobTraceReader(path, conf); - try { - LoggedJob job; - while ((job = reader.getNext()) != null) { - cachedTrace.add(job); - } - } finally { - reader.close(); - } - } - - void readAndCompare() throws IOException { - JobTraceReader reader = new JobTraceReader(path, conf); - try { - for (Iterator it = cachedTrace.iterator(); it.hasNext();) { - LoggedJob jobExpected = it.next(); - LoggedJob jobRead = reader.getNext(); - assertNotNull(jobRead); - try { - jobRead.deepCompare(jobExpected, null); - } catch (DeepInequalityException e) { - fail(e.toString()); - } - } - assertNull(reader.getNext()); - } finally { - reader.close(); - } - } - - class TestThread extends Thread { - final int repeat; - final CountDownLatch startSignal, doneSignal; - final Map errors; - - TestThread(int id, int repeat, CountDownLatch startSignal, CountDownLatch doneSignal, Map errors) { - super(String.format("TestThread-%d", id)); - this.repeat = repeat; - this.startSignal = startSignal; - this.doneSignal = doneSignal; - this.errors = errors; - } - - @Override - public void run() { - try { - startSignal.await(); - for (int i = 0; i < repeat; ++i) { - try { - readAndCompare(); - } catch (Throwable e) { - errors.put(getName(), e); - break; - } - } - doneSignal.countDown(); - } catch (Throwable e) { - errors.put(getName(), e); - } - } - } - - @Test - public void testConcurrentRead() throws InterruptedException { - int nThr = conf.getInt("test.rumen.concurrent-read.threads", 4); - int repeat = conf.getInt("test.rumen.concurrent-read.repeat", 10); - CountDownLatch startSignal = new CountDownLatch(1); - CountDownLatch doneSignal = new CountDownLatch(nThr); - Map errors = Collections - .synchronizedMap(new TreeMap()); - for (int i = 0; i < nThr; ++i) { - new TestThread(i, repeat, startSignal, doneSignal, errors).start(); - } - startSignal.countDown(); - doneSignal.await(); - if (!errors.isEmpty()) { - StringBuilder sb = new StringBuilder(); - for (Map.Entry e : errors.entrySet()) { - sb.append(String.format("%s:\n%s\n", e.getKey(), e.getValue().toString())); - } - fail(sb.toString()); - } - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java deleted file mode 100644 index 446484869c..0000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.tools.rumen; - -import org.apache.hadoop.util.StringUtils; -import org.junit.Test; -import static org.junit.Assert.*; - -public class TestParsedLine { - static final char[] CHARS_TO_ESCAPE = new char[]{'=', '"', '.'}; - - String buildLine(String type, String[] kvseq) { - StringBuilder sb = new StringBuilder(); - sb.append(type); - for (int i=0; i defaultSerializer = new DefaultRumenSerializer(); - - JsonSerializer anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), - new Configuration()); - // test username - UserName uname = new UserName("bob"); - assertEquals("Username error!", "bob", uname.getValue()); - - // test username serialization - // test with no anonymization - // test bob - testSerializer(new UserName("bob"), "bob", defaultSerializer); - // test alice - testSerializer(new UserName("alice"), "alice", defaultSerializer); - - // test user-name serialization - // test with anonymization - // test bob - testSerializer(new UserName("bob"), "user0", anonymizingSerializer); - // test alice - testSerializer(new UserName("alice"), "user1", anonymizingSerializer); - } - - /** - * Test {@link JobName}, serialization and anonymization. - */ - @Test - public void testJobNameSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - JsonSerializer anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), - new Configuration()); - - // test jobname - JobName jname = new JobName("job-secret"); - assertEquals("Jobname error!", "job-secret", jname.getValue()); - - // test job-name serialization - // test with no anonymization - // test job1 - testSerializer(new JobName("job-myjob"), "job-myjob", defaultSerializer); - // test job2 - testSerializer(new JobName("job-yourjob"), "job-yourjob", - defaultSerializer); - - // test job-name serialization - // test with anonymization - // test queue1 - testSerializer(new JobName("secret-job-1"), "job0", anonymizingSerializer); - // test queue2 - testSerializer(new JobName("secret-job-2"), "job1", anonymizingSerializer); - } - - /** - * Test {@link QueueName}, serialization and anonymization. - */ - @Test - public void testQueueNameSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - JsonSerializer anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), - new Configuration()); - - // test queuename - QueueName qname = new QueueName("queue-secret"); - assertEquals("Queuename error!", "queue-secret", qname.getValue()); - - // test queuename serialization - // test with no anonymization - // test queue1 - testSerializer(new QueueName("project1-queue"), - "project1-queue", defaultSerializer); - // test queue2 - testSerializer(new QueueName("project2-queue"), - "project2-queue", defaultSerializer); - - // test queue-name serialization - // test with anonymization - // test queue1 - testSerializer(new QueueName("project1-queue"), - "queue0", anonymizingSerializer); - // test queue2 - testSerializer(new QueueName("project2-queue"), - "queue1", anonymizingSerializer); - } - - /** - * Test {@link NodeName}. - */ - @Test - public void testNodeNameDataType() throws IOException { - // test hostname - // test only hostname - NodeName hname = new NodeName("host1.myorg.com"); - assertNull("Expected missing rack name", hname.getRackName()); - assertEquals("Hostname's test#1 hostname error!", - "host1.myorg.com", hname.getHostName()); - assertEquals("Hostname test#1 error!", "host1.myorg.com", hname.getValue()); - - // test rack/hostname - hname = new NodeName("/rack1.myorg.com/host1.myorg.com"); - assertEquals("Hostname's test#2 rackname error!", - "rack1.myorg.com", hname.getRackName()); - assertEquals("Hostname test#2 hostname error!", - "host1.myorg.com", hname.getHostName()); - assertEquals("Hostname test#2 error!", - "/rack1.myorg.com/host1.myorg.com", hname.getValue()); - - // test hostname and rackname separately - hname = new NodeName("rack1.myorg.com", "host1.myorg.com"); - assertEquals("Hostname's test#3 rackname error!", - "rack1.myorg.com", hname.getRackName()); - assertEquals("Hostname test#3 hostname error!", - "host1.myorg.com", hname.getHostName()); - assertEquals("Hostname test#3 error!", - "/rack1.myorg.com/host1.myorg.com", hname.getValue()); - - // test hostname with no rackname - hname = new NodeName(null, "host1.myorg.com"); - assertNull("Hostname's test#4 rackname error!", hname.getRackName()); - assertEquals("Hostname test#4 hostname error!", - "host1.myorg.com", hname.getHostName()); - assertEquals("Hostname test#4 error!", - "host1.myorg.com", hname.getValue()); - - // test rackname with no hostname - hname = new NodeName("rack1.myorg.com", null); - assertEquals("Hostname test#4 rackname error!", - "rack1.myorg.com", hname.getRackName()); - assertNull("Hostname's test#5 hostname error!", hname.getHostName()); - assertEquals("Hostname test#5 error!", - "rack1.myorg.com", hname.getValue()); - } - - /** - * Test {@link NodeName} serialization. - */ - @Test - public void testNodeNameDefaultSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - // test hostname serialization - // test with no anonymization - // test hostname - testSerializer(new NodeName("hostname.myorg.com"), "hostname.myorg.com", - defaultSerializer); - // test rack/hostname - testSerializer(new NodeName("/rackname.myorg.com/hostname.myorg.com"), - "/rackname.myorg.com/hostname.myorg.com", - defaultSerializer); - // test rack,hostname - testSerializer(new NodeName("rackname.myorg.com", "hostname.myorg.com"), - "/rackname.myorg.com/hostname.myorg.com", - defaultSerializer); - // test -,hostname - testSerializer(new NodeName(null, "hostname.myorg.com"), - "hostname.myorg.com", defaultSerializer); - // test rack,- - testSerializer(new NodeName("rackname.myorg.com", null), - "rackname.myorg.com", defaultSerializer); - } - - /** - * Test {@link NodeName} anonymization. - */ - @Test - public void testNodeNameAnonymization() throws IOException { - JsonSerializer anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), - new Configuration()); - - // test hostname serializer - // test with anonymization - // test hostname - testSerializer(new NodeName("hostname.myorg.com"), "host0", - anonymizingSerializer); - // test hostname reuse - testSerializer(new NodeName("hostname213.myorg.com"), "host1", - anonymizingSerializer); - // test rack/hostname - testSerializer(new NodeName("/rackname.myorg.com/hostname.myorg.com"), - "/rack0/host0", anonymizingSerializer); - // test rack/hostname (hostname reuse) - testSerializer(new NodeName("/rackname654.myorg.com/hostname.myorg.com"), - "/rack1/host0", anonymizingSerializer); - // test rack/hostname (rack reuse) - testSerializer(new NodeName("/rackname654.myorg.com/hostname765.myorg.com"), - "/rack1/host2", anonymizingSerializer); - // test rack,hostname (rack & hostname reuse) - testSerializer(new NodeName("rackname.myorg.com", "hostname.myorg.com"), - "/rack0/host0", anonymizingSerializer); - // test rack,hostname (rack reuse) - testSerializer(new NodeName("rackname.myorg.com", "hostname543.myorg.com"), - "/rack0/host3", anonymizingSerializer); - // test rack,hostname (hostname reuse) - testSerializer(new NodeName("rackname987.myorg.com", "hostname.myorg.com"), - "/rack2/host0", anonymizingSerializer); - // test rack,hostname (rack reuse) - testSerializer(new NodeName("rackname.myorg.com", "hostname654.myorg.com"), - "/rack0/host4", anonymizingSerializer); - // test rack,hostname (host reuse) - testSerializer(new NodeName("rackname876.myorg.com", "hostname.myorg.com"), - "/rack3/host0", anonymizingSerializer); - // test rack,hostname (rack & hostname reuse) - testSerializer(new NodeName("rackname987.myorg.com", - "hostname543.myorg.com"), - "/rack2/host3", anonymizingSerializer); - // test -,hostname (hostname reuse) - testSerializer(new NodeName(null, "hostname.myorg.com"), - "host0", anonymizingSerializer); - // test -,hostname - testSerializer(new NodeName(null, "hostname15.myorg.com"), - "host5", anonymizingSerializer); - // test rack,- (rack reuse) - testSerializer(new NodeName("rackname987.myorg.com", null), - "rack2", anonymizingSerializer); - // test rack,- - testSerializer(new NodeName("rackname15.myorg.com", null), - "rack4", anonymizingSerializer); - } - - /** - * Test {@link JobProperties}. - */ - @Test - public void testJobPropertiesDataType() throws IOException { - // test job properties - Properties properties = new Properties(); - JobProperties jp = new JobProperties(properties); - - // test empty job properties - assertEquals("Job Properties (default) store error", - 0, jp.getValue().size()); - // test by adding some data - properties.put("test-key", "test-value"); // user config - properties.put(MRJobConfig.USER_NAME, "bob"); // job config - properties.put(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx1G"); // deprecated - jp = new JobProperties(properties); - assertEquals("Job Properties (default) store error", - 3, jp.getValue().size()); - assertEquals("Job Properties (default) key#1 error", - "test-value", jp.getValue().get("test-key")); - assertEquals("Job Properties (default) key#2 error", - "bob", jp.getValue().get(MRJobConfig.USER_NAME)); - assertEquals("Job Properties (default) key#3 error", - "-Xmx1G", jp.getValue().get(JobConf.MAPRED_TASK_JAVA_OPTS)); - } - - /** - * Test {@link JobProperties} serialization. - */ - @Test - public void testJobPropertiesSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - // test job properties - Properties properties = new Properties(); - properties.put("test-key", "test-value"); // user config - properties.put(MRJobConfig.USER_NAME, "bob"); // job config - properties.put(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx1G"); // deprecated - JobProperties jp = new JobProperties(properties); - - testSerializer(jp, "{test-key:test-value," - + "mapreduce.job.user.name:bob," - + "mapred.child.java.opts:-Xmx1G}", defaultSerializer); - } - - /** - * Test {@link JobProperties} anonymization. - */ - @Test - public void testJobPropertiesAnonymization() throws IOException { - // test job properties - Properties properties = new Properties(); - Configuration conf = new Configuration(); - - properties.put("test-key", "test-value"); // user config - properties.put(MRJobConfig.USER_NAME, "bob"); // job config - // deprecated - properties.put("mapred.map.child.java.opts", - "-Xmx2G -Xms500m -Dsecret=secret"); - // deprecated and not supported - properties.put(JobConf.MAPRED_TASK_JAVA_OPTS, - "-Xmx1G -Xms200m -Dsecret=secret"); - JobProperties jp = new JobProperties(properties); - - // define a module - SimpleModule module = new SimpleModule("Test Anonymization Serializer", - new Version(0, 0, 0, "TEST")); - // add various serializers to the module - module.addSerializer(DataType.class, new DefaultRumenSerializer()); - module.addSerializer(AnonymizableDataType.class, - new DefaultAnonymizingRumenSerializer(new StatePool(), - conf)); - - //TODO Support deprecated and un-supported keys - testSerializer(jp, "{mapreduce.job.user.name:user0," - + "mapred.map.child.java.opts:-Xmx2G -Xms500m}", module); - } - - /** - * Test {@link ClassName}, serialization and anonymization. - */ - @Test - public void testClassNameSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - JsonSerializer anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), - new Configuration()); - - // test classname - ClassName cName = new ClassName(TestRumenAnonymization.class.getName()); - assertEquals("Classname error!", TestRumenAnonymization.class.getName(), - cName.getValue()); - - // test classname serialization - // test with no anonymization - // test class1 - testSerializer(new ClassName("org.apache.hadoop.Test"), - "org.apache.hadoop.Test", defaultSerializer); - // test class2 - testSerializer(new ClassName("org.apache.hadoop.Test2"), - "org.apache.hadoop.Test2", defaultSerializer); - - // test class-name serialization - // test with anonymization - // test class1 - testSerializer(new ClassName("org.apache.hadoop.Test1"), - "class0", anonymizingSerializer); - // test class2 - testSerializer(new ClassName("org.apache.hadoop.Test2"), - "class1", anonymizingSerializer); - - // test classnames with preserves - Configuration conf = new Configuration(); - conf.set(ClassName.CLASSNAME_PRESERVE_CONFIG, "org.apache.hadoop."); - anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), conf); - // test word with prefix - testSerializer(new ClassName("org.apache.hadoop.Test3"), - "org.apache.hadoop.Test3", anonymizingSerializer); - // test word without prefix - testSerializer(new ClassName("org.apache.hadoop2.Test4"), - "class0", anonymizingSerializer); - } - - /** - * Test {@link FileName}. - */ - @Test - public void testFileName() throws IOException { - // test file on hdfs - FileName hFile = new FileName("hdfs://testnn:123/user/test.json"); - assertEquals("Filename error!", "hdfs://testnn:123/user/test.json", - hFile.getValue()); - // test file on local-fs - hFile = new FileName("file:///user/test.json"); - assertEquals("Filename error!", "file:///user/test.json", - hFile.getValue()); - // test dir on hdfs - hFile = new FileName("hdfs://testnn:123/user/"); - assertEquals("Filename error!", "hdfs://testnn:123/user/", - hFile.getValue()); - // test dir on local-fs - hFile = new FileName("file:///user/"); - assertEquals("Filename error!", "file:///user/", hFile.getValue()); - // test absolute file - hFile = new FileName("/user/test/test.json"); - assertEquals("Filename error!", "/user/test/test.json", hFile.getValue()); - // test absolute directory - hFile = new FileName("/user/test/"); - assertEquals("Filename error!", "/user/test/", hFile.getValue()); - // test relative file - hFile = new FileName("user/test/test2.json"); - assertEquals("Filename error!", "user/test/test2.json", hFile.getValue()); - // test relative directory - hFile = new FileName("user/test/"); - assertEquals("Filename error!", "user/test/", hFile.getValue()); - // test absolute file - hFile = new FileName("user"); - assertEquals("Filename error!", "user", hFile.getValue()); - // test absolute directory - hFile = new FileName("user/"); - assertEquals("Filename error!", "user/", hFile.getValue()); - hFile = new FileName("./tmp"); - assertEquals("Filename error!","./tmp", hFile.getValue()); - hFile = new FileName("./tmp/"); - assertEquals("Filename error!","./tmp/", hFile.getValue()); - hFile = new FileName("../tmp"); - assertEquals("Filename error!","../tmp", hFile.getValue()); - hFile = new FileName("../tmp/"); - assertEquals("Filename error!","../tmp/", hFile.getValue()); - - // test comma separated filenames - // test hdfs filenames, absolute and local-fs filenames - hFile = new FileName("hdfs://testnn:123/user/test1," - + "file:///user/test2,/user/test3"); - assertEquals("Filename error!", - "hdfs://testnn:123/user/test1,file:///user/test2,/user/test3", - hFile.getValue()); - } - - /** - * Test {@link FileName} serialization. - */ - @Test - public void testFileNameSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - // test filename serialization - // test with no anonymization - // test a file on hdfs - testSerializer(new FileName("hdfs://mynn:123/home/user/test.json"), - "hdfs://mynn:123/home/user/test.json", defaultSerializer); - // test a file on local-fs - testSerializer(new FileName("file:///home/user/test.json"), - "file:///home/user/test.json", defaultSerializer); - // test directory on hdfs - testSerializer(new FileName("hdfs://mynn:123/home/user/"), - "hdfs://mynn:123/home/user/", defaultSerializer); - // test directory on local fs - testSerializer(new FileName("file:///home/user/"), - "file:///home/user/", defaultSerializer); - // test absolute file - testSerializer(new FileName("/home/user/test.json"), - "/home/user/test.json", defaultSerializer); - // test relative file - testSerializer(new FileName("home/user/test.json"), - "home/user/test.json", defaultSerializer); - // test absolute folder - testSerializer(new FileName("/home/user/"), "/home/user/", - defaultSerializer); - // test relative folder - testSerializer(new FileName("home/user/"), "home/user/", - defaultSerializer); - // relative file - testSerializer(new FileName("home"), "home", defaultSerializer); - // relative folder - testSerializer(new FileName("home/"), "home/", defaultSerializer); - // absolute file - testSerializer(new FileName("/home"), "/home", defaultSerializer); - // absolute folder - testSerializer(new FileName("/home/"), "/home/", defaultSerializer); - // relative folder - testSerializer(new FileName("./tmp"), "./tmp", defaultSerializer); - testSerializer(new FileName("./tmp/"), "./tmp/", defaultSerializer); - testSerializer(new FileName("../tmp"), "../tmp", defaultSerializer); - - // test comma separated filenames - // test hdfs filenames, absolute and local-fs filenames - FileName fileName = - new FileName("hdfs://testnn:123/user/test1,file:///user/test2," - + "/user/test3"); - testSerializer(fileName, - "hdfs://testnn:123/user/test1,file:///user/test2,/user/test3", - defaultSerializer); - } - - /** - * Test {@link FileName} anonymization. - */ - @Test - public void testFileNameAnonymization() throws IOException { - JsonSerializer anonymizingSerializer = - new DefaultAnonymizingRumenSerializer(new StatePool(), - new Configuration()); - - // test filename serialization - // test with no anonymization - // test hdfs file - testSerializer(new FileName("hdfs://mynn:123/home/user/bob/test.json"), - "hdfs://host0/home/user/dir0/test.json", anonymizingSerializer); - // test local-fs file - testSerializer(new FileName("file:///home/user/alice/test.jar"), - "file:///home/user/dir1/test.jar", anonymizingSerializer); - // test hdfs dir - testSerializer(new FileName("hdfs://mynn:123/home/user/"), - "hdfs://host0/home/user/", anonymizingSerializer); - // test local-fs dir - testSerializer(new FileName("file:///home/user/secret/more-secret/"), - "file:///home/user/dir2/dir3/", anonymizingSerializer); - // test absolute filenames - testSerializer(new FileName("/home/user/top-secret.txt"), - "/home/user/file0.txt", anonymizingSerializer); - // test relative filenames - testSerializer(new FileName("home/user/top-top-secret.zip"), - "home/user/file1.zip", anonymizingSerializer); - // test absolute dirnames - testSerializer(new FileName("/home/user/project1/"), - "/home/user/dir4/", anonymizingSerializer); - // test relative filenames - testSerializer(new FileName("home/user/project1"), - "home/user/file2", anonymizingSerializer); - // test absolute dirnames (re-use) - testSerializer(new FileName("more-secret/"), - "dir3/", anonymizingSerializer); - // test relative filenames (re-use) - testSerializer(new FileName("secret/project1"), - "dir2/file2", anonymizingSerializer); - // test absolute filenames (re-use) - testSerializer(new FileName("/top-secret.txt"), - "/file0.txt", anonymizingSerializer); - // test relative filenames (re-use) - testSerializer(new FileName("top-top-secret.tar"), - "file1.tar", anonymizingSerializer); - // test absolute dirname - testSerializer(new FileName("sensitive-projectname/"), - "dir5/", anonymizingSerializer); - // test relative filenames - testSerializer(new FileName("/real-sensitive-projectname/"), - "/dir6/", anonymizingSerializer); - // test absolute filenames - testSerializer(new FileName("/usernames.xml"), - "/file3.xml", anonymizingSerializer); - // test relative filenames - testSerializer(new FileName("passwords.zip"), - "file4.zip", anonymizingSerializer); - // test relative filenames - testSerializer(new FileName("./tmp"), - "./tmp", anonymizingSerializer); - testSerializer(new FileName("./tmp/"), - "./tmp/", anonymizingSerializer); - testSerializer(new FileName("../tmp"), - "../tmp", anonymizingSerializer); - testSerializer(new FileName("../tmp/"), - "../tmp/", anonymizingSerializer); - - // test comma separated filenames - // test hdfs filenames, absolute and local-fs filenames - FileName fileName = - new FileName("hdfs://mynn:123/home/user/bob/test.json," - + "file:///home/user/bob/test.json,/user/alice/test.json"); - testSerializer(fileName, - "hdfs://host0/home/user/dir0/test.json,file:///home/user/dir0/test.json" - + ",/user/dir1/test.json", - anonymizingSerializer); - } - - - /** - * Test {@link DefaultDataType} serialization. - */ - @Test - public void testDefaultDataTypeSerialization() throws IOException { - JsonSerializer defaultSerializer = new DefaultRumenSerializer(); - - // test default data-type - DefaultDataType dt = new DefaultDataType("test"); - assertEquals("DefaultDataType error!", "test", dt.getValue()); - - // test default data-type - // test with no anonymization - // test data - testSerializer(new DefaultDataType("test"), "test", defaultSerializer); - } - - // A faked OutputStream which stores the stream content into a StringBuffer. - private static class MyOutputStream extends OutputStream { - private StringBuffer data = new StringBuffer(); - - @Override - public void write(int b) throws IOException { - data.append((char)b); - } - - @Override - public void write(byte[] b) throws IOException { - data.append(b); - } - - @Override - public String toString() { - // remove all the '"' for ease of testing - return data.toString().trim().replaceAll("\"", ""); - } - } - - // tests the object serializing using the class of the specified object - @SuppressWarnings("unchecked") - private static void testSerializer(Object toBeSerialized, String expData, - JsonSerializer serializer) - throws IOException { - // define a module - SimpleModule module = new SimpleModule("Test Anonymization Serializer", - new Version(0, 0, 0, "TEST")); - // add various serializers to the module - module.addSerializer(toBeSerialized.getClass(), serializer); - testSerializer(toBeSerialized, expData, module); - } - - // tests the object serializing using the specified class - private static void testSerializer(Object toBeSerialized, String expData, - SimpleModule module) - throws IOException { - // define a custom generator - ObjectMapper outMapper = new ObjectMapper(); - - // register the module - outMapper.registerModule(module); - - // get the json factory - JsonFactory outFactory = outMapper.getJsonFactory(); - // define a fake output stream which will cache the data - MyOutputStream output = new MyOutputStream(); - // define the json output generator - JsonGenerator outGen = - outFactory.createJsonGenerator(output, JsonEncoding.UTF8); - - // serialize the object - outGen.writeObject(toBeSerialized); - //serializer.serialize(toBeSerialized, outGen, null); - - // close the json generator so that it flushes out the data to the output - // stream - outGen.close(); - - assertEquals("Serialization failed!", expData, output.toString()); - } - - /** - * Test {@link DefaultRumenSerializer}. - */ - @Test - public void testDefaultDataSerializers() throws Exception { - JsonSerializer defaultSer = new DefaultRumenSerializer(); - // test default data-type - // test with no anonymization - // test data - testSerializer(new DefaultDataType("test"), "test", defaultSer); - } - - @Test - public void testBlockingDataSerializers() throws Exception { - JsonSerializer blockingSerializer = new BlockingSerializer(); - - // test string serializer - testSerializer("username:password", "null", blockingSerializer); - } - - @Test - public void testObjectStringDataSerializers() throws Exception { - JsonSerializer objectStringSerializer = new ObjectStringSerializer(); - // test job/task/attempt id serializer - // test job-id - JobID jid = JobID.forName("job_1_1"); - testSerializer(jid, jid.toString(), objectStringSerializer); - // test task-id - TaskID tid = new TaskID(jid, TaskType.MAP, 1); - testSerializer(tid, tid.toString(), objectStringSerializer); - // test attempt-id - TaskAttemptID aid = new TaskAttemptID(tid, 0); - testSerializer(aid, aid.toString(), objectStringSerializer); - } - - // test anonymizer - @Test - public void testRumenAnonymization() throws Exception { - Configuration conf = new Configuration(); - - // Run a MR job - // create a MR cluster - conf.setInt(TTConfig.TT_MAP_SLOTS, 1); - conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1); - - MiniDFSCluster dfsCluster = null; - MiniMRCluster mrCluster = null; - - // local filesystem for running TraceBuilder - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testRumenAnonymization"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - try { - dfsCluster = new MiniDFSCluster(conf, 1, true, null); - String[] racks = new String[] {"/rack123.myorg.com", - "/rack456.myorg.com"}; - String[] hosts = new String[] {"host1230.myorg.com", - "host4560.myorg.com"}; - mrCluster = - new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), - 1, racks, hosts, new JobConf(conf)); - - // run a job - Path inDir = new Path("secret-input"); - Path outDir = new Path("secret-output"); - - JobConf jConf = mrCluster.createJobConf(); - // add some usr sensitive data in the job conf - jConf.set("user-secret-code", "abracadabra"); - - jConf.setJobName("top-secret"); - // construct a job with 1 map and 1 reduce task. - Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 2, 2); - // wait for the job to complete - job.waitForCompletion(false); - - assertTrue("Job failed", job.isSuccessful()); - - JobID id = job.getJobID(); - Cluster cluster = new Cluster(jConf); - String user = cluster.getAllJobStatuses()[0].getUsername(); - - // get the jobhistory filepath - Path jhPath = - new Path(mrCluster.getJobTrackerRunner().getJobTracker() - .getJobHistoryDir()); - Path inputLogPath = JobHistory.getJobHistoryFile(jhPath, id, user); - Path inputConfPath = JobHistory.getConfFile(jhPath, id); - // wait for 10 secs for the jobhistory file to move into the done folder - FileSystem fs = inputLogPath.getFileSystem(jConf); - for (int i = 0; i < 100; ++i) { - if (fs.exists(inputLogPath)) { - break; - } - TimeUnit.MILLISECONDS.wait(100); - } - - assertTrue("Missing job history file", fs.exists(inputLogPath)); - - // run trace builder on the job history logs - Path goldTraceFilename = new Path(tempDir, "trace.json"); - Path goldTopologyFilename = new Path(tempDir, "topology.json"); - - // build the trace-builder command line args - String[] args = new String[] {goldTraceFilename.toString(), - goldTopologyFilename.toString(), - inputLogPath.toString(), - inputConfPath.toString()}; - Tool analyzer = new TraceBuilder(); - int result = ToolRunner.run(analyzer, args); - assertEquals("Non-zero exit", 0, result); - - // anonymize the job trace - Path anonymizedTraceFilename = new Path(tempDir, "trace-anonymized.json"); - Path anonymizedClusterTopologyFilename = - new Path(tempDir, "topology-anonymized.json"); - args = new String[] {"-trace", goldTraceFilename.toString(), - anonymizedTraceFilename.toString(), - "-topology", goldTopologyFilename.toString(), - anonymizedClusterTopologyFilename.toString()}; - Tool anonymizer = new Anonymizer(); - result = ToolRunner.run(anonymizer, args); - assertEquals("Non-zero exit", 0, result); - - JobTraceReader reader = new JobTraceReader(anonymizedTraceFilename, conf); - LoggedJob anonymizedJob = reader.getNext(); - reader.close(); // close the reader as we need only 1 job - // test - // user-name - String currentUser = UserGroupInformation.getCurrentUser().getUserName(); - assertFalse("Username not anonymized!", - currentUser.equals(anonymizedJob.getUser().getValue())); - // jobid - assertEquals("JobID mismatch!", - id.toString(), anonymizedJob.getJobID().toString()); - // queue-name - assertFalse("Queuename mismatch!", - "default".equals(anonymizedJob.getQueue().getValue())); - // job-name - assertFalse("Jobname mismatch!", - "top-secret".equals(anonymizedJob.getJobName().getValue())); - - // job properties - for (Map.Entry entry : - anonymizedJob.getJobProperties().getValue().entrySet()) { - assertFalse("User sensitive configuration key not anonymized", - entry.getKey().toString().equals("user-secret-code")); - assertFalse("User sensitive data not anonymized", - entry.getValue().toString().contains(currentUser)); - assertFalse("User sensitive data not anonymized", - entry.getValue().toString().contains("secret")); - } - - // test map tasks - testTasks(anonymizedJob.getMapTasks(), id, TaskType.MAP); - - // test reduce tasks - testTasks(anonymizedJob.getReduceTasks(), id, TaskType.REDUCE); - - // test other tasks - testTasks(anonymizedJob.getOtherTasks(), id, null); - - // test the anonymized cluster topology file - ClusterTopologyReader cReader = - new ClusterTopologyReader(anonymizedClusterTopologyFilename, conf); - LoggedNetworkTopology loggedNetworkTopology = cReader.get(); - // test the cluster topology - testClusterTopology(loggedNetworkTopology, 0, "myorg"); - } finally { - // shutdown and cleanup - if (mrCluster != null) { - mrCluster.shutdown(); - } - - if (dfsCluster != null) { - dfsCluster.formatDataNodeDirs(); - dfsCluster.shutdown(); - } - lfs.delete(tempDir, true); - } - } - - // test task level details lije - // - taskid - // - locality info - // - attempt details - // - attempt execution hostname - private static void testTasks(List tasks, JobID id, - TaskType type) { - int index = 0; - for (LoggedTask task : tasks) { - // generate the expected task id for this task - if (type != null) { - TaskID tid = new TaskID(id, type, index++); - assertEquals("TaskID mismatch!", - tid.toString(), task.getTaskID().toString()); - } - - // check locality information - if (task.getPreferredLocations() != null) { - for (LoggedLocation loc : task.getPreferredLocations()) { - for (NodeName name : loc.getLayers()) { - assertFalse("Hostname mismatch!", - name.getValue().contains("myorg")); - } - } - } - - // check execution host - for (LoggedTaskAttempt attempt : task.getAttempts()) { - // generate the expected task id for this task - TaskAttemptID aid = new TaskAttemptID(task.getTaskID(), 0); - assertEquals("TaskAttemptID mismatch!", - aid.toString(), attempt.getAttemptID().toString()); - - assertNotNull("Hostname null!", attempt.getHostName()); - assertFalse("Hostname mismatch!", - attempt.getHostName().getValue().contains("myorg")); - } - } - } - - // tests the logged network topology - private static void testClusterTopology(LoggedNetworkTopology topology, - int level, String bannedString) { - assertFalse("Cluster topology test failed!", - topology.getName().getValue().contains(bannedString)); - if (level == 0) { - assertEquals("Level-1 data mismatch!", - "", topology.getName().getValue()); - } else if (level == 1) { - assertTrue("Level-2 data mismatch!", - topology.getName().getValue().contains("rack")); - assertFalse("Level-2 data mismatch!", - topology.getName().getValue().contains("host")); - } else { - assertTrue("Level-2 data mismatch!", - topology.getName().getValue().contains("host")); - assertFalse("Level-2 data mismatch!", - topology.getName().getValue().contains("rack")); - } - - // if the current node is a rack, then test the nodes under it - if (topology.getChildren() != null) { - for (LoggedNetworkTopology child : topology.getChildren()) { - testClusterTopology(child, level + 1, bannedString); - } - } - } - - @Test - public void testCLI() throws Exception { - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testCLI"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // test no args - String[] args = new String[] {}; - testAnonymizerCLI(args, -1); - - // test with wrong args - args = new String[] {"-test"}; - testAnonymizerCLI(args, -1); - - args = new String[] {"-trace"}; - testAnonymizerCLI(args, -1); - - args = new String[] {"-topology"}; - testAnonymizerCLI(args, -1); - - args = new String[] {"-trace -topology"}; - testAnonymizerCLI(args, -1); - - Path testTraceInputFilename = new Path(tempDir, "trace-in.json"); - args = new String[] {"-trace", testTraceInputFilename.toString()}; - testAnonymizerCLI(args, -1); - - Path testTraceOutputFilename = new Path(tempDir, "trace-out.json"); - args = new String[] {"-trace", testTraceInputFilename.toString(), - testTraceOutputFilename.toString()}; - testAnonymizerCLI(args, -1); - - OutputStream out = lfs.create(testTraceInputFilename); - out.write("{\n}".getBytes()); - out.close(); - args = new String[] {"-trace", testTraceInputFilename.toString(), - testTraceOutputFilename.toString()}; - testAnonymizerCLI(args, 0); - - Path testToplogyInputFilename = new Path(tempDir, "topology-in.json"); - args = new String[] {"-topology", testToplogyInputFilename.toString()}; - testAnonymizerCLI(args, -1); - - Path testTopologyOutputFilename = new Path(tempDir, "topology-out.json"); - args = new String[] {"-topology", testToplogyInputFilename.toString(), - testTopologyOutputFilename.toString()}; - testAnonymizerCLI(args, -1); - - out = lfs.create(testToplogyInputFilename); - out.write("{\n}".getBytes()); - out.close(); - args = new String[] {"-topology", testToplogyInputFilename.toString(), - testTopologyOutputFilename.toString()}; - testAnonymizerCLI(args, 0); - - args = new String[] {"-trace", testTraceInputFilename.toString(), - "-topology", testToplogyInputFilename.toString()}; - testAnonymizerCLI(args, -1); - - args = new String[] {"-trace", testTraceInputFilename.toString(), - testTraceOutputFilename.toString(), - "-topology", testToplogyInputFilename.toString(), - testTopologyOutputFilename.toString()}; - testAnonymizerCLI(args, 0); - } - - // tests the Anonymizer CLI via the Tools interface - private static void testAnonymizerCLI(String[] args, int eExitCode) - throws Exception { - Anonymizer anonymizer = new Anonymizer(); - - int exitCode = ToolRunner.run(anonymizer, args); - assertEquals("Exit code mismatch", eExitCode, exitCode); - } - - /** - * Test {@link StatePool}'s reload and persistence feature. - */ - @Test - public void testStatePool() throws Exception { - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testStatePool"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the state dir - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - - StatePool pool = new StatePool(); - // test reload, persist and dir config - // test with no reload and persist - pool.initialize(conf); - - // test with reload and/or persist enabled with no dir - assertNull("Default state pool error", - pool.getState(MyState.class)); - - // try persisting - pool.persist(); - assertFalse("State pool persisted when disabled", lfs.exists(tempDir)); - - // test wrongly configured state-pool - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - conf.unset(StatePool.DIR_CONFIG); - pool = new StatePool(); - boolean success = true; - try { - pool.initialize(conf); - } catch (Exception e) { - success = false; - } - assertFalse("State pool bad configuration succeeded", success); - - // test wrongly configured state-pool - conf.setBoolean(StatePool.RELOAD_CONFIG, false); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - pool = new StatePool(); - success = true; - try { - pool.initialize(conf); - } catch (Exception e) { - success = false; - } - assertFalse("State manager bad configuration succeeded", success); - - - // test persistence - conf.setBoolean(StatePool.RELOAD_CONFIG, false); - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - pool = new StatePool(); - pool.initialize(conf); - - // add states to the state pool - MyState myState = new MyState(); - pool.addState(MyState.class, myState); - myState.setState("test-1"); - // try persisting - pool.persist(); - assertTrue("State pool persisted when enabled", lfs.exists(tempDir)); - assertEquals("State pool persisted when enabled", - 1, lfs.listStatus(tempDir).length); - - // reload - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - pool = new StatePool(); - pool.initialize(conf); - MyState pState = - (MyState) pool.getState(MyState.class); - assertEquals("State pool persistence/reload failed", "test-1", - pState.getState()); - - // try persisting with no state change - pool.persist(); - assertEquals("State pool persisted when disabled", - 1, lfs.listStatus(tempDir).length); - - // modify the state of the pool and check persistence - pState.setUpdated(true); - pool.persist(); - assertEquals("State pool persisted when disabled", - 2, lfs.listStatus(tempDir).length); - - // delete the temp directory if everything goes fine - lfs.delete(tempDir, true); - } - - /** - * Test state. - */ - static class MyState implements State { - private boolean updated = false; - private String state = "null"; - - @Override - @JsonIgnore - public String getName() { - return "test"; - } - - @Override - public void setName(String name) { - // for now, simply assert since this class has a hardcoded name - if (!getName().equals(name)) { - throw new RuntimeException("State name mismatch! Expected '" - + getName() + "' but found '" + name + "'."); - } - } - - public void setState(String state) { - this.state = state; - } - - public String getState() { - return state; - } - - void setUpdated(boolean up) { - this.updated = up; - } - - @Override - @JsonIgnore - public boolean isUpdated() { - return updated; - } - } - - @SuppressWarnings("unchecked") - private static String getValueFromDataType(Object object) { - DataType dt = (DataType) object; - return dt.getValue(); - } - - @Test - public void testJobPropertiesParser() { - // test default parser - Properties properties = new Properties(); - Configuration conf = new Configuration(); - JobProperties jp = new JobProperties(properties); - assertEquals("Job Properties (default filter) store error", - 0, jp.getAnonymizedValue(null, conf).size()); - - // define key-value pairs for job configuration - String key1 = "test-key"; - String value1 = "test-value"; - properties.put(key1, value1); // user config - String key2 = MRJobConfig.USER_NAME; - String value2 = "bob"; - properties.put(key2, value2); // job config - String key3 = JobConf.MAPRED_MAP_TASK_JAVA_OPTS; - String value3 = "-Xmx1G"; - properties.put(key3, value3); // deprecated - String key4 = MRJobConfig.REDUCE_JAVA_OPTS; - String value4 = "-Xms100m"; - properties.put(key4, value4); - - jp = new JobProperties(properties); - - // Configure the default parser - conf.set(JobProperties.PARSERS_CONFIG_KEY, - DefaultJobPropertiesParser.class.getName()); - // anonymize - Properties defaultProp = jp.getAnonymizedValue(null, conf); - assertEquals("Job Properties (all-pass filter) store error", - 4, defaultProp.size()); - assertEquals("Job Properties (default filter) key#1 error", value1, - getValueFromDataType(defaultProp.get(key1))); - assertEquals("Job Properties (default filter) key#2 error", value2, - getValueFromDataType(defaultProp.get(key2))); - assertEquals("Job Properties (default filter) key#3 error", value3, - getValueFromDataType(defaultProp.get(key3))); - assertEquals("Job Properties (default filter) key#4 error", value4, - getValueFromDataType(defaultProp.get(key4))); - - // test MR parser - conf.set(JobProperties.PARSERS_CONFIG_KEY, - MapReduceJobPropertiesParser.class.getName()); - // anonymize - Properties filteredProp = jp.getAnonymizedValue(null, conf); - assertEquals("Job Properties (MR filter) store error", - 3, filteredProp.size()); - assertNull("Job Properties (MR filter) key#1 error", - filteredProp.get(key1)); - assertEquals("Job Properties (MR filter) key#2 error", value2, - getValueFromDataType(filteredProp.get(key2))); - assertEquals("Job Properties (MR filter) key#3 error", value3, - getValueFromDataType(filteredProp.get(key3))); - assertEquals("Job Properties (MR filter) key#4 error", value4, - getValueFromDataType(filteredProp.get(key4))); - } - - /** - * Test {@link WordListAnonymizerUtility}. Test various features like - * - test known words - * - test known suffix - */ - @Test - public void testWordListBasedAnonymizer() { - String[] knownSuffixes = new String[] {".1", ".2", ".3", ".4"}; - - // test with valid suffix - assertTrue("suffix test#0 failed!", - WordListAnonymizerUtility.hasSuffix("a.1", knownSuffixes)); - String split[] = - WordListAnonymizerUtility.extractSuffix("a.1", knownSuffixes); - assertEquals("suffix test#1 failed!", 2, split.length); - assertEquals("suffix test#2 failed!", "a", split[0]); - assertEquals("suffix test#3 failed!", ".1", split[1]); - - // test with valid suffix - assertTrue("suffix test#0 failed!", - WordListAnonymizerUtility.hasSuffix("a.1", knownSuffixes)); - split = - WordListAnonymizerUtility.extractSuffix("/a/b.2", knownSuffixes); - assertEquals("suffix test#0 failed!", 2, split.length); - assertEquals("suffix test#1 failed!", "/a/b", split[0]); - assertEquals("suffix test#2 failed!", ".2", split[1]); - - // test with invalid suffix - assertFalse("suffix test#0 failed!", - WordListAnonymizerUtility.hasSuffix("a.b", knownSuffixes)); - - boolean failed = false; - try { - split = WordListAnonymizerUtility.extractSuffix("a.b", knownSuffixes); - } catch (Exception e) { - failed = true; - } - assertTrue("Exception expected!", failed); - - String[] knownWords = new String[] {"a", "b"}; - - // test with valid data - assertTrue("data test#0 failed!", - WordListAnonymizerUtility.isKnownData("a", knownWords)); - // test with valid data - assertTrue("data test#1 failed!", - WordListAnonymizerUtility.isKnownData("b", knownWords)); - // test with invalid data - assertFalse("data test#2 failed!", - WordListAnonymizerUtility.isKnownData("c", knownWords)); - - // test with valid known word - assertTrue("data test#3 failed!", - WordListAnonymizerUtility.isKnownData("job")); - // test with invalid known word - assertFalse("data test#4 failed!", - WordListAnonymizerUtility.isKnownData("bob")); - - // test numeric data - assertFalse("Numeric test failed!", - WordListAnonymizerUtility.needsAnonymization("123")); - // test numeric data (unsupported) - assertTrue("Numeric test failed!", - WordListAnonymizerUtility.needsAnonymization("123.456")); - // test text data - assertTrue("Text test failed!", - WordListAnonymizerUtility.needsAnonymization("123abc")); - } - - /** - * Test {@link WordList} features like - * - add words - * - index - * - contains - */ - @Test - public void testWordList() throws Exception { - // test features with fresh state - WordList wordList = new WordList(); - assertFalse("Word list state incorrect", wordList.isUpdated()); - - // add some special word - String test = "abbracadabra"; - wordList.add(test); - assertTrue("Word list failed to store", wordList.contains(test)); - assertEquals("Word list index failed", 0, wordList.indexOf(test)); - assertEquals("Word list size failed", 1, wordList.getSize()); - assertTrue("Word list state incorrect", wordList.isUpdated()); - - // add already added word - wordList.add(test); - assertEquals("Word list index failed", 0, wordList.indexOf(test)); - assertEquals("Word list size failed", 1, wordList.getSize()); - assertTrue("Word list state incorrect", wordList.isUpdated()); - - String test2 = "hakuna-matata"; - wordList.add(test2); - assertTrue("Word list failed to store", wordList.contains(test2)); - assertEquals("Word list index failed", 1, wordList.indexOf(test2)); - assertEquals("Word list size failed", 2, wordList.getSize()); - assertTrue("Word list state incorrect", wordList.isUpdated()); - - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testWordList"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // define a state pool to help persist the wordlist - StatePool pool = new StatePool(); - - try { - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), wordList); - - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", WordList.class, state.getClass()); - WordList pList = (WordList) state; - - // check size - assertEquals("Word list size on reload failed", 2, pList.getSize()); - assertFalse("Word list state incorrect", pList.isUpdated()); - - // add already added word - pList.add(test); - assertEquals("Word list index on reload failed", 0, pList.indexOf(test)); - assertEquals("Word list size on reload failed", 2, pList.getSize()); - assertFalse("Word list state on reload incorrect", pList.isUpdated()); - - String test3 = "disco-dancer"; - assertFalse("Word list failed to after reload", pList.contains(test3)); - pList.add(test3); - assertTrue("Word list failed to store on reload", pList.contains(test3)); - assertEquals("Word list index on reload failed", 2, pList.indexOf(test3)); - assertEquals("Word list size on reload failed", 3, pList.getSize()); - assertTrue("Word list state on reload incorrect", pList.isUpdated()); - - // test previously added (persisted) word - assertTrue("Word list failed to store on reload", pList.contains(test2)); - assertEquals("Word list index on reload failed", 1, pList.indexOf(test2)); - } finally { - lfs.delete(tempDir, true); - } - } - - /** - * Test {@link FileName#FileNameState} persistence with directories only. - */ - @Test - public void testFileNameStateWithDir() throws Exception { - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testFileNameStateWithDir"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // define a state pool to help persist the dirs - StatePool pool = new StatePool(); - - FileNameState fState = new FileNameState(); - - // define the directory names - String test1 = "test"; - String test2 = "home"; - - // test dir only - WordList dirState = new WordList("dir"); - dirState.add(test1); - dirState.add(test2); - - // set the directory state - fState.setDirectoryState(dirState); - - try { - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), fState); - - // persist the state - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", - FileNameState.class, state.getClass()); - FileNameState newFState = (FileNameState) state; - - // check the state contents - WordList newStateWordList = newFState.getDirectoryState(); - assertTrue("File state failed to store on reload", - newStateWordList.contains(test1)); - assertEquals("File state index on reload failed", - 0, newStateWordList.indexOf(test1)); - - assertTrue("File state failed to store on reload", - newStateWordList.contains(test2)); - assertEquals("File state index on reload failed", - 1, newStateWordList.indexOf(test2)); - } finally { - lfs.delete(tempDir, true); - } - } - - /** - * Test {@link FileName#FileNameState} persistence with files only. - */ - @Test - public void testFileNameStateWithFiles() throws Exception { - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testFileNameStateWithFiles"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // define a state pool to help persist the filename parts - StatePool pool = new StatePool(); - - FileNameState fState = new FileNameState(); - - // define the file names - String test1 = "part-00.bzip"; - String test2 = "file1.txt"; - - // test filenames only - WordList fileNameState = new WordList("files"); - fileNameState.add(test1); - fileNameState.add(test2); - - // set the filename state - fState.setDirectoryState(fileNameState); - - try { - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), fState); - - // persist the state - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", - FileNameState.class, state.getClass()); - FileNameState newFState = (FileNameState) state; - - // check the state contents - WordList newFileWordList = newFState.getDirectoryState(); - assertTrue("File state failed on reload", - newFileWordList.contains(test1)); - assertEquals("File state indexing on reload failed", - 0, newFileWordList.indexOf(test1)); - - assertTrue("File state failed on reload", - newFileWordList.contains(test2)); - assertEquals("File state indexing on reload failed", - 1, newFileWordList.indexOf(test2)); - } finally { - lfs.delete(tempDir, true); - } - } - - /** - * Test {@link FileName#FileNameState} persistence with files and directories. - */ - @Test - public void testFileNameState() throws Exception { - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testFileNameState"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // define a state pool to help persist the filename parts - StatePool pool = new StatePool(); - - FileNameState fState = new FileNameState(); - - // define the directory names - String testD1 = "test"; - String testD2 = "home"; - String testD3 = "tmp"; - - // test dir only - WordList dirState = new WordList("dir"); - dirState.add(testD1); - dirState.add(testD2); - dirState.add(testD3); - - // define the file names - String testF1 = "part-00.bzip"; - String testF2 = "file1.txt"; - String testF3 = "tmp"; - - // test filenames only - WordList fileNameState = new WordList("files"); - fileNameState.add(testF1); - fileNameState.add(testF2); - fileNameState.add(testF3); - - // set the filename state - fState.setFileNameState(fileNameState); - // set the directory state - fState.setDirectoryState(dirState); - - try { - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), fState); - - // persist the state - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", - FileNameState.class, state.getClass()); - FileNameState newFState = (FileNameState) state; - - // test filenames - WordList newStateWordList = newFState.getFileNameState(); - assertTrue("File state failed on reload", - newStateWordList.contains(testF1)); - assertEquals("File state indexing on reload failed", - 0, newStateWordList.indexOf(testF1)); - - assertTrue("File state failed on reload", - newStateWordList.contains(testF2)); - assertEquals("File state indexing on reload failed", - 1, newStateWordList.indexOf(testF2)); - - assertTrue("File state failed on reload", - newStateWordList.contains(testF3)); - assertEquals("File state indexing on reload failed", - 2, newStateWordList.indexOf(testF3)); - - // test dirs - WordList newDirWordList = newFState.getDirectoryState(); - assertTrue("File state failed on reload", - newDirWordList.contains(testD1)); - assertEquals("File state indexing on reload failed", - 0, newDirWordList.indexOf(testD1)); - - assertTrue("File state failed on reload", - newDirWordList.contains(testD2)); - assertEquals("File state indexing on reload failed", - 1, newDirWordList.indexOf(testD2)); - assertTrue("File state failed on reload", - newDirWordList.contains(testD3)); - assertEquals("File state indexing on reload failed", - 2, newDirWordList.indexOf(testD3)); - } finally { - lfs.delete(tempDir, true); - } - } - - /** - * Test {@link NodeName#NodeName} persistence with hostnames only. - */ - @Test - public void testNodeNameStateWithHostNameOnly() throws Exception { - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testNodeNameStateWithHostNameOnly"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // define a state pool to help persist the hostnames - StatePool pool = new StatePool(); - - NodeNameState nState = new NodeNameState(); - - // define the host names - String test1 = "abc123"; - String test2 = "xyz789"; - - // test hostname only - WordList hostNameState = new WordList("hostname"); - hostNameState.add(test1); - hostNameState.add(test2); - - // set the directory state - nState.setHostNameState(hostNameState); - - try { - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), nState); - - // persist the state - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", - NodeNameState.class, state.getClass()); - NodeNameState newNState = (NodeNameState) state; - - // check the state contents - WordList newStateWordList = newNState.getHostNameState(); - assertTrue("Node state failed to store on reload", - newStateWordList.contains(test1)); - assertEquals("Node state index on reload failed", - 0, newStateWordList.indexOf(test1)); - - assertTrue("Node state failed to store on reload", - newStateWordList.contains(test2)); - assertEquals("Node state index on reload failed", - 1, newStateWordList.indexOf(test2)); - } finally { - lfs.delete(tempDir, true); - } - } - - /** - * Test {@link NodeName#NodeNameState} persistence with racknames only. - */ - @Test - public void testNodeNameWithRackNamesOnly() throws Exception { - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testNodeNameWithRackNamesOnly"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // define a state pool to help persist the rack names - StatePool pool = new StatePool(); - - NodeNameState nState = new NodeNameState(); - - // define the rack names - String test1 = "rack1"; - String test2 = "rack2"; - - // test filenames only - WordList rackNameState = new WordList("racknames"); - rackNameState.add(test1); - rackNameState.add(test2); - - // set the rackname state - nState.setRackNameState(rackNameState); - - try { - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), nState); - - // persist the state - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", - NodeNameState.class, state.getClass()); - NodeNameState newNState = (NodeNameState) state; - - // check the state contents - WordList newFileWordList = newNState.getRackNameState(); - assertTrue("File state failed on reload", - newFileWordList.contains(test1)); - assertEquals("File state indexing on reload failed", - 0, newFileWordList.indexOf(test1)); - - assertTrue("File state failed on reload", - newFileWordList.contains(test2)); - assertEquals("File state indexing on reload failed", - 1, newFileWordList.indexOf(test2)); - } finally { - lfs.delete(tempDir, true); - } - } - - /** - * Test {@link NodeName#NodeNameState} persistence with hosts and racks. - */ - @Test - public void testNodeNameState() throws Exception { - // test persistence - Configuration conf = new Configuration(); - FileSystem lfs = FileSystem.getLocal(conf); - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - Path tempDir = new Path(rootTempDir, "testNodeNameState"); - tempDir = lfs.makeQualified(tempDir); - lfs.delete(tempDir, true); - - // set the persistence directory - conf.set(StatePool.DIR_CONFIG, tempDir.toString()); - conf.setBoolean(StatePool.PERSIST_CONFIG, true); - - // define a state pool to help persist the node names. - StatePool pool = new StatePool(); - - NodeNameState nState = new NodeNameState(); - - // define the rack names - String testR1 = "rack1"; - String testR2 = "rack2"; - String testR3 = "rack3"; - - WordList rackState = new WordList("rack"); - rackState.add(testR1); - rackState.add(testR2); - rackState.add(testR3); - - String testH1 = "host1"; - String testH2 = "host2"; - String testH3 = "host3"; - - WordList hostNameState = new WordList("host"); - hostNameState.add(testH1); - hostNameState.add(testH2); - hostNameState.add(testH3); - - // set the filename state - nState.setHostNameState(hostNameState); - nState.setRackNameState(rackState); - - try { - // initialize the state-pool - pool.initialize(conf); - - // add the wordlist to the pool - pool.addState(getClass(), nState); - - // persist the state - pool.persist(); - - // now clear the pool state - pool = new StatePool(); - - // set reload to true - conf.setBoolean(StatePool.RELOAD_CONFIG, true); - - // initialize the state-pool - pool.initialize(conf); - - State state = pool.getState(getClass()); - assertNotNull("Missing state!", state); - assertEquals("Incorrect state class!", - NodeNameState.class, state.getClass()); - NodeNameState newNState = (NodeNameState) state; - - // test nodenames - WordList newHostWordList = newNState.getHostNameState(); - assertTrue("File state failed on reload", - newHostWordList.contains(testH1)); - assertEquals("File state indexing on reload failed", - 0, newHostWordList.indexOf(testH1)); - - assertTrue("File state failed on reload", - newHostWordList.contains(testH2)); - assertEquals("File state indexing on reload failed", - 1, newHostWordList.indexOf(testH2)); - - assertTrue("File state failed on reload", - newHostWordList.contains(testH3)); - assertEquals("File state indexing on reload failed", - 2, newHostWordList.indexOf(testH3)); - - // test racknames - WordList newRackWordList = newNState.getRackNameState(); - assertTrue("File state failed on reload", - newRackWordList.contains(testR1)); - assertEquals("File state indexing on reload failed", - 0, newRackWordList.indexOf(testR1)); - - assertTrue("File state failed on reload", - newRackWordList.contains(testR2)); - assertEquals("File state indexing on reload failed", - 1, newRackWordList.indexOf(testR2)); - assertTrue("File state failed on reload", - newRackWordList.contains(testR3)); - assertEquals("File state indexing on reload failed", - 2, newRackWordList.indexOf(testR3)); - } finally { - lfs.delete(tempDir, true); - } - } -} \ No newline at end of file diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java deleted file mode 100644 index 2fe0d7194a..0000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java +++ /dev/null @@ -1,196 +0,0 @@ -package org.apache.hadoop.tools.rumen; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.io.InputStream; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.ToolRunner; - -import org.junit.Test; -import static org.junit.Assert.*; - -public class TestRumenFolder { - @Test - public void testFoldingSmallTrace() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - @SuppressWarnings("deprecation") - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")) - .makeQualified(lfs); - @SuppressWarnings("deprecation") - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")) - .makeQualified(lfs); - - final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test"); - final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces"); - lfs.delete(tempDir, true); - - final Path foldedTracePath = new Path(tempDir, "folded-trace.json"); - - final Path inputFile = - new Path(rootInputFile, "folder-input-trace.json.gz"); - - System.out.println("folded trace result path = " + foldedTracePath); - - String[] args = - { "-input-cycle", "100S", "-output-duration", "300S", - "-skew-buffer-length", "1", "-seed", "100", "-concentration", "2", - inputFile.toString(), foldedTracePath.toString() }; - - final Path foldedGoldFile = - new Path(rootInputFile, "goldFoldedTrace.json.gz"); - - Folder folder = new Folder(); - int result = ToolRunner.run(folder, args); - assertEquals("Non-zero exit", 0, result); - - TestRumenFolder. jsonFileMatchesGold(conf, lfs, foldedTracePath, - foldedGoldFile, LoggedJob.class, "trace"); - } - - @Test - public void testStartsAfterOption() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - @SuppressWarnings("deprecation") - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")) - .makeQualified(lfs); - @SuppressWarnings("deprecation") - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")) - .makeQualified(lfs); - - final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test"); - final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces"); - lfs.delete(tempDir, true); - - final Path inputFile = - new Path(rootInputFile, "goldFoldedTrace.json.gz"); - - final Path foldedTracePath = new Path(tempDir, - "folded-skippedjob-trace.json"); - String[] args = - { "-input-cycle", "300S", "-output-duration", "300S", - "-starts-after", "30S", - inputFile.toString(), foldedTracePath.toString() }; - - Folder folder = new Folder(); - int result = ToolRunner.run(folder, args); - assertEquals("Non-zero exit", 0, result); - - TestRumenFolder. checkValidityAfterSkippingJobs(conf, lfs, foldedTracePath, - inputFile, LoggedJob.class, "trace", 30000, 300000); - } - - static private void - checkValidityAfterSkippingJobs(Configuration conf, - FileSystem lfs, Path result, Path inputFile, - Class clazz, String fileDescription, - long startsAfter, long duration) throws IOException { - - JsonObjectMapperParser inputFileParser = - new JsonObjectMapperParser(inputFile, clazz, conf); - InputStream resultStream = lfs.open(result); - JsonObjectMapperParser resultParser = - new JsonObjectMapperParser(resultStream, clazz); - List gpSubmitTimes = new LinkedList(); - List rpSubmitTimes = new LinkedList(); - try { - //Get submitTime of first job - LoggedJob firstJob = (LoggedJob)inputFileParser.getNext(); - gpSubmitTimes.add(firstJob.getSubmitTime()); - long absoluteStartsAfterTime = firstJob.getSubmitTime() + startsAfter; - - //total duration - long endTime = firstJob.getSubmitTime() + duration; - - //read original trace - LoggedJob oriJob = null; - while((oriJob = (LoggedJob)inputFileParser.getNext()) != null) { - gpSubmitTimes.add(oriJob.getSubmitTime()); - } - - //check if retained jobs have submittime > starts-after - LoggedJob job = null; - while((job = (LoggedJob) resultParser.getNext()) != null) { - assertTrue("job's submit time in the output trace is less " + - "than the specified value of starts-after", - (job.getSubmitTime() >= absoluteStartsAfterTime)); - rpSubmitTimes.add(job.getSubmitTime()); - } - - List skippedJobs = new LinkedList(); - skippedJobs.addAll(gpSubmitTimes); - skippedJobs.removeAll(rpSubmitTimes); - - //check if the skipped job submittime < starts-after - for(Long submitTime : skippedJobs) { - assertTrue("skipped job submit time " + submitTime + - " in the trace is greater " + - "than the specified value of starts-after " - + absoluteStartsAfterTime, - (submitTime < absoluteStartsAfterTime)); - } - } finally { - IOUtils.cleanup(null, inputFileParser, resultParser); - } - } - - static private void jsonFileMatchesGold( - Configuration conf, FileSystem lfs, Path result, Path gold, - Class clazz, String fileDescription) throws IOException { - JsonObjectMapperParser goldParser = - new JsonObjectMapperParser(gold, clazz, conf); - InputStream resultStream = lfs.open(result); - JsonObjectMapperParser resultParser = - new JsonObjectMapperParser(resultStream, clazz); - try { - while (true) { - DeepCompare goldJob = goldParser.getNext(); - DeepCompare resultJob = resultParser.getNext(); - if ((goldJob == null) || (resultJob == null)) { - assertTrue(goldJob == resultJob); - break; - } - - try { - resultJob.deepCompare(goldJob, new TreePath(null, "")); - } catch (DeepInequalityException e) { - String error = e.path.toString(); - - assertFalse(fileDescription + " mismatches: " + error, true); - } - } - } finally { - IOUtils.cleanup(null, goldParser, resultParser); - } - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java deleted file mode 100644 index bb92426f5f..0000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java +++ /dev/null @@ -1,1259 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.tools.rumen; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.MapReduceTestUtil; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat; -import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobHistory; -import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; -import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; -import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; -import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.junit.Test; -import static org.junit.Assert.*; - -public class TestRumenJobTraces { - private static final Log LOG = LogFactory.getLog(TestRumenJobTraces.class); - - @Test - public void testSmallTrace() throws Exception { - performSingleTest("sample-job-tracker-logs.gz", - "job-tracker-logs-topology-output", "job-tracker-logs-trace-output.gz"); - } - - @Test - public void testTruncatedTask() throws Exception { - performSingleTest("truncated-job-tracker-log", "truncated-topology-output", - "truncated-trace-output"); - } - - private void performSingleTest(String jtLogName, String goldTopology, - String goldTrace) throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - - final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test"); - final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces"); - lfs.delete(tempDir, true); - - final Path topologyFile = new Path(tempDir, jtLogName + "-topology.json"); - final Path traceFile = new Path(tempDir, jtLogName + "-trace.json"); - - final Path inputFile = new Path(rootInputFile, jtLogName); - - System.out.println("topology result file = " + topologyFile); - System.out.println("trace result file = " + traceFile); - - String[] args = new String[6]; - - args[0] = "-v1"; - - args[1] = "-write-topology"; - args[2] = topologyFile.toString(); - - args[3] = "-write-job-trace"; - args[4] = traceFile.toString(); - - args[5] = inputFile.toString(); - - final Path topologyGoldFile = new Path(rootInputFile, goldTopology); - final Path traceGoldFile = new Path(rootInputFile, goldTrace); - - @SuppressWarnings("deprecation") - HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer(); - int result = ToolRunner.run(analyzer, args); - assertEquals("Non-zero exit", 0, result); - - TestRumenJobTraces - . jsonFileMatchesGold(conf, topologyFile, - topologyGoldFile, LoggedNetworkTopology.class, "topology"); - TestRumenJobTraces. jsonFileMatchesGold(conf, traceFile, - traceGoldFile, LoggedJob.class, "trace"); - } - - @Test - public void testRumenViaDispatch() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - - final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test"); - final Path tempDir = new Path(rootTempDir, "TestRumenViaDispatch"); - lfs.delete(tempDir, true); - - final Path topologyPath = new Path(tempDir, "dispatch-topology.json"); - final Path tracePath = new Path(tempDir, "dispatch-trace.json"); - - final Path inputPath = - new Path(rootInputPath, "dispatch-sample-v20-jt-log.gz"); - - System.out.println("topology result file = " + topologyPath); - System.out.println("testRumenViaDispatch() trace result file = " + tracePath); - - String demuxerClassName = ConcatenatedInputFilesDemuxer.class.getName(); - - String[] args = - { "-demuxer", demuxerClassName, tracePath.toString(), - topologyPath.toString(), inputPath.toString() }; - - final Path topologyGoldFile = - new Path(rootInputPath, "dispatch-topology-output.json.gz"); - final Path traceGoldFile = - new Path(rootInputPath, "dispatch-trace-output.json.gz"); - - Tool analyzer = new TraceBuilder(); - int result = ToolRunner.run(analyzer, args); - assertEquals("Non-zero exit", 0, result); - - TestRumenJobTraces - . jsonFileMatchesGold(conf, topologyPath, - topologyGoldFile, LoggedNetworkTopology.class, "topology"); - TestRumenJobTraces. jsonFileMatchesGold(conf, tracePath, - traceGoldFile, LoggedJob.class, "trace"); - } - - @Test - public void testBracketedCounters() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - - final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test"); - final Path tempDir = new Path(rootTempDir, "TestBracketedCounters"); - lfs.delete(tempDir, true); - - final Path topologyPath = new Path(tempDir, "dispatch-topology.json"); - final Path tracePath = new Path(tempDir, "dispatch-trace.json"); - - final Path inputPath = new Path(rootInputPath, "counters-format-test-logs"); - - System.out.println("topology result file = " + topologyPath); - System.out.println("testBracketedCounters() trace result file = " + tracePath); - - final Path goldPath = - new Path(rootInputPath, "counters-test-trace.json.gz"); - - String[] args = - { tracePath.toString(), topologyPath.toString(), inputPath.toString() }; - - Tool analyzer = new TraceBuilder(); - int result = ToolRunner.run(analyzer, args); - assertEquals("Non-zero exit", 0, result); - - TestRumenJobTraces. jsonFileMatchesGold(conf, tracePath, - goldPath, LoggedJob.class, "trace"); - } - - @Test - public void testHadoop20JHParser() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - - final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test"); - - // history file to be parsed to get events - final Path inputPath = new Path(rootInputPath, "v20-single-input-log.gz"); - - RewindableInputStream ris = getRewindableInputStream(inputPath, conf); - assertNotNull(ris); - - Hadoop20JHParser parser = null; - - try { - assertEquals("Hadoop20JHParser can't parse the test file " + - inputPath, true, Hadoop20JHParser.canParse(ris)); - - ris.rewind(); - parser = new Hadoop20JHParser(ris); - ArrayList seenEvents = new ArrayList(150); - - // this is same as the one in input history file - String jobId = "job_200904211745_0002"; - JobBuilder builder = new JobBuilder(jobId); - - // get events into seenEvents - getHistoryEvents(parser, seenEvents, builder); - - // Validate the events seen by history parser from - // history file v20-single-input-log.gz - validateSeenHistoryEvents(seenEvents, goldLines); - - ParsedJob parsedJob = builder.build(); - // validate the obtainXXX api of ParsedJob, ParsedTask and - // ParsedTaskAttempt - validateParsedJob(parsedJob, 20, 1, true); - } finally { - if (parser != null) { - parser.close(); - } - ris.close(); - } - } - - /** - * Validate the parsing of given history file name. - * - * TODO: Also validate the history file name suffixed with old/stale file - * suffix. - * @param jhFileName job history file path - * @param jid JobID - */ - private void validateHistoryFileNameParsing(Path jhFileName, - org.apache.hadoop.mapred.JobID jid) { - JobID extractedJID = - JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName())); - assertEquals("TraceBuilder failed to parse the current JH filename" - + jhFileName, jid, extractedJID); - //TODO test jobhistory filename with old/stale file suffix - } - - /** - * Validate the parsing of given history conf file name. Also validate the - * history conf file name suffixed with old/stale file suffix. - * @param jhConfFileName job history conf file path - * @param jid JobID - */ - private void validateJHConfFileNameParsing(Path jhConfFileName, - org.apache.hadoop.mapred.JobID jid) { - assertTrue("TraceBuilder failed to parse the JH conf filename:" - + jhConfFileName, - JobHistoryUtils.isJobConfXml(jhConfFileName.getName())); - JobID extractedJID = - JobID.forName(JobHistoryUtils.extractJobID(jhConfFileName.getName())); - assertEquals("TraceBuilder failed to parse the current JH conf filename:" - + jhConfFileName, jid, extractedJID); - // Test jobhistory conf filename with old/stale file suffix - jhConfFileName = jhConfFileName.suffix(JobHistory.getOldFileSuffix("123")); - assertTrue("TraceBuilder failed to parse the current JH conf filename" - + " (old suffix):" + jhConfFileName, - JobHistoryUtils.isJobConfXml(jhConfFileName.getName())); - extractedJID = - JobID.forName(JobHistoryUtils.extractJobID(jhConfFileName.getName())); - assertEquals("TraceBuilder failed to parse the JH conf filename" - + "(old-suffix):" + jhConfFileName, - jid, extractedJID); - } - - /** - * Tests if {@link TraceBuilder} can correctly identify and parse different - * versions of jobhistory filenames. The testcase checks if - * {@link TraceBuilder} - * - correctly identifies a jobhistory filename without suffix - * - correctly parses a jobhistory filename without suffix to extract out - * the jobid - * - correctly identifies a jobhistory filename with suffix - * - correctly parses a jobhistory filename with suffix to extract out the - * jobid - * - correctly identifies a job-configuration filename stored along with the - * jobhistory files - */ - @Test - public void testJobHistoryFilenameParsing() throws IOException { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - org.apache.hadoop.mapred.JobID jid = - new org.apache.hadoop.mapred.JobID("12345", 1); - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")) - .makeQualified(lfs.getUri(), lfs.getWorkingDirectory()); - - // Check if current jobhistory filenames are detected properly - JobId jobId = TypeConverter.toYarn(jid); - JobIndexInfo info = new JobIndexInfo(0L, 0L, "", "", jobId, 0, 0, ""); - Path jhFilename = new Path(FileNameIndexUtils.getDoneFileName(info)); - validateHistoryFileNameParsing(jhFilename, jid); - - // Check if Pre21 V1 jophistory file names are detected properly - jhFilename = new Path("jt-identifier_" + jid + "_user-name_job-name"); - validateHistoryFileNameParsing(jhFilename, jid); - - // Check if Pre21 V2 jobhistory file names are detected properly - jhFilename = new Path(jid + "_user-name_job-name"); - validateHistoryFileNameParsing(jhFilename, jid); - - // Check if the current jobhistory conf filenames are detected properly - Path jhConfFilename = JobHistory.getConfFile(rootInputDir, jid); - validateJHConfFileNameParsing(jhConfFilename, jid); - - // Check if Pre21 V1 jobhistory conf file names are detected properly - jhConfFilename = new Path("jt-identifier_" + jid + "_conf.xml"); - validateJHConfFileNameParsing(jhConfFilename, jid); - - // Check if Pre21 V2 jobhistory conf file names are detected properly - jhConfFilename = new Path(jid + "_conf.xml"); - validateJHConfFileNameParsing(jhConfFilename, jid); - } - - /** - * Check if processing of input arguments is as expected by passing globbed - * input path - *
  • without -recursive option and - *
  • with -recursive option. - */ - @Test - public void testProcessInputArgument() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - // define the test's root temporary directory - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")) - .makeQualified(lfs.getUri(), lfs.getWorkingDirectory()); - // define the test's root input directory - Path testRootInputDir = new Path(rootTempDir, "TestProcessInputArgument"); - // define the nested input directory - Path nestedInputDir = new Path(testRootInputDir, "1/2/3/4"); - // define the globbed version of the nested input directory - Path globbedInputNestedDir = - lfs.makeQualified(new Path(testRootInputDir, "*/*/*/*/*")); - try { - lfs.delete(nestedInputDir, true); - - List recursiveInputPaths = new ArrayList(); - List nonRecursiveInputPaths = new ArrayList(); - // Create input files under the given path with multiple levels of - // sub directories - createHistoryLogsHierarchy(nestedInputDir, lfs, recursiveInputPaths, - nonRecursiveInputPaths); - - // Check the case of globbed input path and without -recursive option - List inputs = MyOptions.processInputArgument( - globbedInputNestedDir.toString(), conf, false); - validateHistoryLogPaths(inputs, nonRecursiveInputPaths); - - // Check the case of globbed input path and with -recursive option - inputs = MyOptions.processInputArgument( - globbedInputNestedDir.toString(), conf, true); - validateHistoryLogPaths(inputs, recursiveInputPaths); - - } finally { - lfs.delete(testRootInputDir, true); - } - } - - /** - * Validate if the input history log paths are as expected. - * @param inputs the resultant input paths to be validated - * @param expectedHistoryFileNames the expected input history logs - * @throws IOException - */ - private void validateHistoryLogPaths(List inputs, - List expectedHistoryFileNames) throws IOException { - - System.out.println("\nExpected history files are:"); - for (String historyFile : expectedHistoryFileNames) { - System.out.println(historyFile); - } - System.out.println("\nResultant history files are:"); - List historyLogs = new ArrayList(); - for (Path p : inputs) { - historyLogs.add(p.toUri().getPath()); - System.out.println(p.toUri().getPath()); - } - - assertEquals("Number of history logs found is different from the expected.", - expectedHistoryFileNames.size(), inputs.size()); - - // Verify if all the history logs are expected ones and they are in the - // expected order - assertTrue("Some of the history log files do not match the expected.", - historyLogs.equals(expectedHistoryFileNames)); - } - - /** - * Create history logs under the given path with multiple levels of - * sub directories as shown below. - *
    - * Create a file, an empty subdirectory and a nonempty subdirectory - * <historyDir> under the given input path. - *
    - * The subdirectory <historyDir> contains the following dir structure: - *
    - *
    <historyDir>/historyFile1.txt - *
    <historyDir>/historyFile1.gz - *
    <historyDir>/subDir1/historyFile2.txt - *
    <historyDir>/subDir1/historyFile2.gz - *
    <historyDir>/subDir2/historyFile3.txt - *
    <historyDir>/subDir2/historyFile3.gz - *
    <historyDir>/subDir1/subDir11/historyFile4.txt - *
    <historyDir>/subDir1/subDir11/historyFile4.gz - *
    <historyDir>/subDir2/subDir21/ - *
    - * Create the lists of input paths that should be processed by TraceBuilder - * for recursive case and non-recursive case. - * @param nestedInputDir the input history logs directory where history files - * with nested subdirectories are created - * @param fs FileSystem of the input paths - * @param recursiveInputPaths input paths for recursive case - * @param nonRecursiveInputPaths input paths for non-recursive case - * @throws IOException - */ - private void createHistoryLogsHierarchy(Path nestedInputDir, FileSystem fs, - List recursiveInputPaths, List nonRecursiveInputPaths) - throws IOException { - List dirs = new ArrayList(); - // define a file in the nested test input directory - Path inputPath1 = new Path(nestedInputDir, "historyFile.txt"); - // define an empty sub-folder in the nested test input directory - Path emptyDir = new Path(nestedInputDir, "emptyDir"); - // define a nonempty sub-folder in the nested test input directory - Path historyDir = new Path(nestedInputDir, "historyDir"); - - fs.mkdirs(nestedInputDir); - // Create an empty input file - fs.createNewFile(inputPath1); - // Create empty subdir - fs.mkdirs(emptyDir);// let us not create any files under this dir - - fs.mkdirs(historyDir); - dirs.add(historyDir); - - Path subDir1 = new Path(historyDir, "subDir1"); - fs.mkdirs(subDir1); - dirs.add(subDir1); - Path subDir2 = new Path(historyDir, "subDir2"); - fs.mkdirs(subDir2); - dirs.add(subDir2); - - Path subDir11 = new Path(subDir1, "subDir11"); - fs.mkdirs(subDir11); - dirs.add(subDir11); - Path subDir21 = new Path(subDir2, "subDir21"); - fs.mkdirs(subDir21);// let us not create any files under this dir - - int i = 0; - for (Path dir : dirs) { - i++; - Path gzPath = new Path(dir, "historyFile" + i + ".gz"); - Path txtPath = new Path(dir, "historyFile" + i + ".txt"); - fs.createNewFile(txtPath); - fs.createNewFile(gzPath); - recursiveInputPaths.add(gzPath.toUri().getPath()); - recursiveInputPaths.add(txtPath.toUri().getPath()); - if (i == 1) { - nonRecursiveInputPaths.add(gzPath.toUri().getPath()); - nonRecursiveInputPaths.add(txtPath.toUri().getPath()); - } - } - recursiveInputPaths.add(inputPath1.toUri().getPath()); - nonRecursiveInputPaths.add(inputPath1.toUri().getPath()); - } - - /** - * Test if {@link CurrentJHParser} can read events from current JH files. - */ - @Test - public void testCurrentJHParser() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - - final Path tempDir = new Path(rootTempDir, "TestCurrentJHParser"); - lfs.delete(tempDir, true); - - // Run a MR job - // create a MR cluster - conf.setInt(TTConfig.TT_MAP_SLOTS, 1); - conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1); - MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null, - new JobConf(conf)); - - // run a job - Path inDir = new Path(tempDir, "input"); - Path outDir = new Path(tempDir, "output"); - JobHistoryParser parser = null; - RewindableInputStream ris = null; - ArrayList seenEvents = new ArrayList(15); - - try { - JobConf jConf = mrCluster.createJobConf(); - // construct a job with 1 map and 1 reduce task. - Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 1, 1); - // disable setup/cleanup - job.setJobSetupCleanupNeeded(false); - // set the output format to take care of the _temporary folder - job.setOutputFormatClass(MyOutputFormat.class); - // wait for the job to complete - job.waitForCompletion(false); - - assertTrue("Job failed", job.isSuccessful()); - - JobID id = job.getJobID(); - JobClient jc = new JobClient(jConf); - String user = jc.getAllJobs()[0].getUsername(); - - // get the jobhistory filepath - Path jhPath = - new Path(mrCluster.getJobTrackerRunner().getJobTracker() - .getJobHistoryDir()); - Path inputPath = JobHistory.getJobHistoryFile(jhPath, id, user); - // wait for 10 secs for the jobhistory file to move into the done folder - for (int i = 0; i < 100; ++i) { - if (lfs.exists(inputPath)) { - break; - } - TimeUnit.MILLISECONDS.wait(100); - } - - assertTrue("Missing job history file", lfs.exists(inputPath)); - - ris = getRewindableInputStream(inputPath, conf); - - // Test if the JobHistoryParserFactory can detect the parser correctly - parser = JobHistoryParserFactory.getParser(ris); - - // create a job builder - JobBuilder builder = new JobBuilder(id.toString()); - - // get events into seenEvents and also process them using builder - getHistoryEvents(parser, seenEvents, builder); - - // Check against the gold standard - System.out.println("testCurrentJHParser validating using gold std "); - // The list of history events expected when parsing the above job's - // history log file - String[] goldLinesExpected = new String[] { - JSE, JPCE, JIE, JSCE, TSE, ASE, MFE, TFE, TSE, ASE, RFE, TFE, JFE - }; - - validateSeenHistoryEvents(seenEvents, goldLinesExpected); - - // validate resource usage metrics - // get the job counters - Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters(); - - // get the parsed job - ParsedJob parsedJob = builder.build(); - // get the logged job - LoggedJob loggedJob = parsedJob; - // get the logged attempts - LoggedTaskAttempt attempt = - loggedJob.getMapTasks().get(0).getAttempts().get(0); - // get the resource usage metrics - ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics(); - - // check with the actual values - testResourceUsageMetricViaDeepCompare(metrics, - counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue(), - counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue(), - counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(), - counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(), - true); - - // validate the obtainXXX api of ParsedJob, ParsedTask and - // ParsedTaskAttempt - validateParsedJob(parsedJob, 1, 1, false); - } finally { - // stop the MR cluster - mrCluster.shutdown(); - - if (ris != null) { - ris.close(); - } - if (parser != null) { - parser.close(); - } - - // cleanup the filesystem - lfs.delete(tempDir, true); - } - } - - /** - * Verify if the obtainXXX methods of {@link ParsedJob}, {@link ParsedTask} - * and {@link ParsedTaskAttempt} give valid info - */ - private void validateParsedJob(ParsedJob parsedJob, int numMaps, - int numReduces, boolean pre21JobHistory) { - validateParsedJobAPI(parsedJob, numMaps, numReduces, pre21JobHistory); - - List maps = parsedJob.obtainMapTasks(); - for (ParsedTask task : maps) { - validateParsedTask(task); - } - List reduces = parsedJob.obtainReduceTasks(); - for (ParsedTask task : reduces) { - validateParsedTask(task); - } - List others = parsedJob.obtainOtherTasks(); - for (ParsedTask task : others) { - validateParsedTask(task); - } - } - - /** Verify if the obtainXXX methods of {@link ParsedJob} give valid info */ - private void validateParsedJobAPI(ParsedJob parsedJob, int numMaps, - int numReduces, boolean pre21JobHistory) { - LOG.info("Validating ParsedJob.obtainXXX api... for " - + parsedJob.getJobID()); - assertNotNull("Job acls in ParsedJob is null", - parsedJob.obtainJobAcls()); - assertNotNull("Job conf path in ParsedJob is null", - parsedJob.obtainJobConfpath()); - - assertNotNull("Map Counters in ParsedJob is null", - parsedJob.obtainMapCounters()); - assertNotNull("Reduce Counters in ParsedJob is null", - parsedJob.obtainReduceCounters()); - assertNotNull("Total Counters in ParsedJob is null", - parsedJob.obtainTotalCounters()); - - assertNotNull("Map Tasks List in ParsedJob is null", - parsedJob.obtainMapTasks()); - assertNotNull("Reduce Tasks List in ParsedJob is null", - parsedJob.obtainReduceTasks()); - assertNotNull("Other Tasks List in ParsedJob is null", - parsedJob.obtainOtherTasks()); - - // 1 map and 1 reduce task should be there - assertEquals("Number of map tasks in ParsedJob is wrong", - numMaps, parsedJob.obtainMapTasks().size()); - assertEquals("Number of reduce tasks in ParsedJob is wrong", - numReduces, parsedJob.obtainReduceTasks().size(), 1); - - // old hadoop20 version history files don't have job-level-map-counters and - // job-level-reduce-counters. Only total counters exist there. - assertTrue("Total Counters in ParsedJob is empty", - parsedJob.obtainTotalCounters().size() > 0); - if (!pre21JobHistory) { - assertTrue("Map Counters in ParsedJob is empty", - parsedJob.obtainMapCounters().size() > 0); - assertTrue("Reduce Counters in ParsedJob is empty", - parsedJob.obtainReduceCounters().size() > 0); - } - } - - /** - * Verify if the obtainXXX methods of {@link ParsedTask} and - * {@link ParsedTaskAttempt} give valid info - */ - private void validateParsedTask(ParsedTask parsedTask) { - validateParsedTaskAPI(parsedTask); - - List attempts = parsedTask.obtainTaskAttempts(); - for (ParsedTaskAttempt attempt : attempts) { - validateParsedTaskAttemptAPI(attempt); - } - } - - /** Verify if the obtainXXX methods of {@link ParsedTask} give valid info */ - private void validateParsedTaskAPI(ParsedTask parsedTask) { - LOG.info("Validating ParsedTask.obtainXXX api... for " - + parsedTask.getTaskID()); - assertNotNull("Task counters in ParsedTask is null", - parsedTask.obtainCounters()); - - if (parsedTask.getTaskStatus() - == Pre21JobHistoryConstants.Values.SUCCESS) { - // task counters should not be empty - assertTrue("Task counters in ParsedTask is empty", - parsedTask.obtainCounters().size() > 0); - assertNull("Diagnostic-info is non-null for a succeeded task", - parsedTask.obtainDiagnosticInfo()); - assertNull("Failed-due-to-attemptId is non-null for a succeeded task", - parsedTask.obtainFailedDueToAttemptId()); - } else { - assertNotNull("Diagnostic-info is non-null for a succeeded task", - parsedTask.obtainDiagnosticInfo()); - assertNotNull("Failed-due-to-attemptId is non-null for a succeeded task", - parsedTask.obtainFailedDueToAttemptId()); - } - - List attempts = parsedTask.obtainTaskAttempts(); - assertNotNull("TaskAttempts list in ParsedTask is null", attempts); - assertTrue("TaskAttempts list in ParsedTask is empty", - attempts.size() > 0); - } - - /** - * Verify if the obtainXXX methods of {@link ParsedTaskAttempt} give - * valid info - */ - private void validateParsedTaskAttemptAPI( - ParsedTaskAttempt parsedTaskAttempt) { - LOG.info("Validating ParsedTaskAttempt.obtainXXX api... for " - + parsedTaskAttempt.getAttemptID()); - assertNotNull("Counters in ParsedTaskAttempt is null", - parsedTaskAttempt.obtainCounters()); - - if (parsedTaskAttempt.getResult() - == Pre21JobHistoryConstants.Values.SUCCESS) { - assertTrue("Counters in ParsedTaskAttempt is empty", - parsedTaskAttempt.obtainCounters().size() > 0); - assertNull("Diagnostic-info is non-null for a succeeded taskAttempt", - parsedTaskAttempt.obtainDiagnosticInfo()); - } else { - assertNotNull("Diagnostic-info is non-null for a succeeded taskAttempt", - parsedTaskAttempt.obtainDiagnosticInfo()); - } - assertNotNull("TrackerName in ParsedTaskAttempt is null", - parsedTaskAttempt.obtainTrackerName()); - - assertNotNull("http-port info in ParsedTaskAttempt is null", - parsedTaskAttempt.obtainHttpPort()); - assertNotNull("Shuffle-port info in ParsedTaskAttempt is null", - parsedTaskAttempt.obtainShufflePort()); - } - - @Test - public void testJobConfigurationParser() throws Exception { - - // Validate parser with old mapred config properties from - // sample-conf-file.xml - String[] oldProps1 = { "mapred.job.queue.name", "mapred.job.name", - "mapred.child.java.opts" }; - - validateJobConfParser("sample-conf.file.xml", false); - validateJobConfParser("sample-conf.file.new.xml", true); - } - - private void validateJobConfParser(String confFile, boolean newConfig) - throws Exception { - - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - @SuppressWarnings("deprecation") - final Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")) - .makeQualified(lfs); - - final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test"); - - final Path inputPath = new Path(rootInputPath, confFile); - - InputStream inputConfStream = - new PossiblyDecompressedInputStream(inputPath, conf); - - try { - Properties props = JobConfigurationParser.parse(inputConfStream); - inputConfStream.close(); - - String oldOrNew = newConfig ? "New" : "Old"; - assertEquals(oldOrNew + " config property for job queue name is not " - + " extracted properly.", "TheQueue", - JobBuilder.extract(props, JobConfPropertyNames.QUEUE_NAMES - .getCandidates(), null)); - assertEquals(oldOrNew + " config property for job name is not " - + " extracted properly.", "MyMRJob", - JobBuilder.extract(props, JobConfPropertyNames.JOB_NAMES - .getCandidates(), null)); - - validateChildJavaOpts(newConfig, props); - - } finally { - inputConfStream.close(); - } - } - - // Validate child java opts in properties. - // newConfigProperties: boolean that specifies if the config properties to be - // validated are new OR old. - private void validateChildJavaOpts(boolean newConfigProperties, - Properties props) { - if (newConfigProperties) { - assertEquals("New config property " + MRJobConfig.MAP_JAVA_OPTS - + " is not extracted properly.", - "-server -Xmx640m -Djava.net.preferIPv4Stack=true", - JobBuilder.extract(props, JobConfPropertyNames.MAP_JAVA_OPTS_S - .getCandidates(), null)); - assertEquals("New config property " + MRJobConfig.REDUCE_JAVA_OPTS - + " is not extracted properly.", - "-server -Xmx650m -Djava.net.preferIPv4Stack=true", - JobBuilder.extract(props, JobConfPropertyNames.REDUCE_JAVA_OPTS_S - .getCandidates(), null)); - } - else { - // if old property mapred.child.java.opts is set, then extraction of all - // the following 3 properties should give that value. - assertEquals("mapred.child.java.opts is not extracted properly.", - "-server -Xmx640m -Djava.net.preferIPv4Stack=true", - JobBuilder.extract(props, JobConfPropertyNames.TASK_JAVA_OPTS_S - .getCandidates(), null)); - assertEquals("New config property " + MRJobConfig.MAP_JAVA_OPTS - + " is not extracted properly when the old config property " - + "mapred.child.java.opts is set.", - "-server -Xmx640m -Djava.net.preferIPv4Stack=true", - JobBuilder.extract(props, JobConfPropertyNames.MAP_JAVA_OPTS_S - .getCandidates(), null)); - assertEquals("New config property " + MRJobConfig.REDUCE_JAVA_OPTS - + " is not extracted properly when the old config property " - + "mapred.child.java.opts is set.", - "-server -Xmx640m -Djava.net.preferIPv4Stack=true", - JobBuilder.extract(props, JobConfPropertyNames.REDUCE_JAVA_OPTS_S - .getCandidates(), null)); - } - } - - /** - * Test if the {@link JobConfigurationParser} can correctly extract out - * key-value pairs from the job configuration. - */ - @Test - public void testJobConfigurationParsing() throws Exception { - final FileSystem lfs = FileSystem.getLocal(new Configuration()); - - final Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")).makeQualified( - lfs.getUri(), lfs.getWorkingDirectory()); - - final Path tempDir = new Path(rootTempDir, "TestJobConfigurationParser"); - lfs.delete(tempDir, true); - - // Add some configuration parameters to the conf - JobConf jConf = new JobConf(false); - String key = "test.data"; - String value = "hello world"; - jConf.set(key, value); - - // create the job conf file - Path jobConfPath = new Path(tempDir.toString(), "job.xml"); - lfs.delete(jobConfPath, false); - DataOutputStream jobConfStream = lfs.create(jobConfPath); - jConf.writeXml(jobConfStream); - jobConfStream.close(); - - // now read the job conf file using the job configuration parser - Properties properties = - JobConfigurationParser.parse(lfs.open(jobConfPath)); - - // check if the required parameter is loaded - assertEquals("Total number of extracted properties (" + properties.size() - + ") doesn't match the expected size of 1 [" - + "JobConfigurationParser]", - 1, properties.size()); - // check if the key is present in the extracted configuration - assertTrue("Key " + key + " is missing in the configuration extracted " - + "[JobConfigurationParser]", - properties.keySet().contains(key)); - // check if the desired property has the correct value - assertEquals("JobConfigurationParser couldn't recover the parameters" - + " correctly", - value, properties.get(key)); - - // Test ZombieJob - LoggedJob job = new LoggedJob(); - job.setJobProperties(properties); - - ZombieJob zjob = new ZombieJob(job, null); - Configuration zconf = zjob.getJobConf(); - // check if the required parameter is loaded - assertEquals("ZombieJob couldn't recover the parameters correctly", - value, zconf.get(key)); - } - - - /** - * Test {@link ResourceUsageMetrics}. - */ - @Test - public void testResourceUsageMetrics() throws Exception { - final long cpuUsage = 100; - final long pMemUsage = 200; - final long vMemUsage = 300; - final long heapUsage = 400; - - // test ResourceUsageMetrics's setters - ResourceUsageMetrics metrics = new ResourceUsageMetrics(); - metrics.setCumulativeCpuUsage(cpuUsage); - metrics.setPhysicalMemoryUsage(pMemUsage); - metrics.setVirtualMemoryUsage(vMemUsage); - metrics.setHeapUsage(heapUsage); - // test cpu usage value - assertEquals("Cpu usage values mismatch via set", cpuUsage, - metrics.getCumulativeCpuUsage()); - // test pMem usage value - assertEquals("Physical memory usage values mismatch via set", pMemUsage, - metrics.getPhysicalMemoryUsage()); - // test vMem usage value - assertEquals("Virtual memory usage values mismatch via set", vMemUsage, - metrics.getVirtualMemoryUsage()); - // test heap usage value - assertEquals("Heap usage values mismatch via set", heapUsage, - metrics.getHeapUsage()); - - // test deepCompare() (pass case) - testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, - pMemUsage, heapUsage, true); - - // test deepCompare (fail case) - // test cpu usage mismatch - testResourceUsageMetricViaDeepCompare(metrics, 0, vMemUsage, pMemUsage, - heapUsage, false); - // test pMem usage mismatch - testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 0, - heapUsage, false); - // test vMem usage mismatch - testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, 0, pMemUsage, - heapUsage, false); - // test heap usage mismatch - testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, - pMemUsage, 0, false); - - // define a metric with a fixed value of size() - ResourceUsageMetrics metrics2 = new ResourceUsageMetrics() { - @Override - public int size() { - return -1; - } - }; - metrics2.setCumulativeCpuUsage(cpuUsage); - metrics2.setPhysicalMemoryUsage(pMemUsage); - metrics2.setVirtualMemoryUsage(vMemUsage); - metrics2.setHeapUsage(heapUsage); - - // test with size mismatch - testResourceUsageMetricViaDeepCompare(metrics2, cpuUsage, vMemUsage, - pMemUsage, heapUsage, false); - } - - // test ResourceUsageMetric's deepCompare() method - private static void testResourceUsageMetricViaDeepCompare( - ResourceUsageMetrics metrics, long cpuUsage, - long vMemUsage, long pMemUsage, long heapUsage, - boolean shouldPass) { - ResourceUsageMetrics testMetrics = new ResourceUsageMetrics(); - testMetrics.setCumulativeCpuUsage(cpuUsage); - testMetrics.setPhysicalMemoryUsage(pMemUsage); - testMetrics.setVirtualMemoryUsage(vMemUsage); - testMetrics.setHeapUsage(heapUsage); - - Boolean passed = null; - try { - metrics.deepCompare(testMetrics, new TreePath(null, "")); - passed = true; - } catch (DeepInequalityException die) { - passed = false; - } - - assertEquals("ResourceUsageMetrics deepCompare() failed!", - shouldPass, passed); - } - - /** - * Testing {@link ResourceUsageMetrics} using {@link HadoopLogsAnalyzer}. - */ - @Test - @SuppressWarnings("deprecation") - public void testResourceUsageMetricsWithHadoopLogsAnalyzer() - throws IOException { - Configuration conf = new Configuration(); - // get the input trace file - Path rootInputDir = - new Path(System.getProperty("test.tools.input.dir", "")); - Path rootInputSubFolder = new Path(rootInputDir, "rumen/small-trace-test"); - Path traceFile = new Path(rootInputSubFolder, "v20-resource-usage-log.gz"); - - FileSystem lfs = FileSystem.getLocal(conf); - - // define the root test directory - Path rootTempDir = - new Path(System.getProperty("test.build.data", "/tmp")); - - // define output directory - Path outputDir = - new Path(rootTempDir, "testResourceUsageMetricsWithHadoopLogsAnalyzer"); - lfs.delete(outputDir, true); - lfs.deleteOnExit(outputDir); - - // run HadoopLogsAnalyzer - HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer(); - analyzer.setConf(conf); - Path traceOutput = new Path(outputDir, "trace.json"); - analyzer.run(new String[] {"-write-job-trace", traceOutput.toString(), - "-v1", traceFile.toString()}); - - // test HadoopLogsAnalyzer's output w.r.t ResourceUsageMetrics - // get the logged job - JsonObjectMapperParser traceParser = - new JsonObjectMapperParser(traceOutput, LoggedJob.class, - conf); - - // get the logged job from the output trace file - LoggedJob job = traceParser.getNext(); - LoggedTaskAttempt attempt = job.getMapTasks().get(0).getAttempts().get(0); - ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics(); - - // test via deepCompare() - testResourceUsageMetricViaDeepCompare(metrics, 200, 100, 75, 50, true); - } - - @Test - public void testTopologyBuilder() throws Exception { - final TopologyBuilder subject = new TopologyBuilder(); - - // This 4 comes from - // TaskInProgress.ProgressibleSplitsBlock.burst().size , which - // is invisible here. - - int[][] splits = new int[4][]; - - splits[0] = new int[12]; - splits[1] = new int[12]; - splits[2] = new int[12]; - splits[3] = new int[12]; - - for (int j = 0; j < 4; ++j) { - for (int i = 0; i < 12; ++i) { - splits[j][i] = -1; - } - } - - for (int i = 0; i < 6; ++i) { - splits[0][i] = 500000 * i; - splits[1][i] = 300000 * i; - splits[2][i] = 500000; - splits[3][i] = 700000; - } - - // currently we extract no host names from the Properties - subject.process(new Properties()); - - subject.process(new TaskAttemptFinishedEvent(TaskAttemptID - .forName("attempt_200904211745_0003_m_000004_0"), TaskType - .valueOf("MAP"), "STATUS", 1234567890L, - "/194\\.6\\.134\\.64", "cluster50261\\.secondleveldomain\\.com", - "SUCCESS", null)); - subject.process(new TaskAttemptUnsuccessfulCompletionEvent - (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"), - TaskType.valueOf("MAP"), "STATUS", 1234567890L, - "cluster50262\\.secondleveldomain\\.com", - -1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits)); - subject.process(new TaskAttemptUnsuccessfulCompletionEvent - (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"), - TaskType.valueOf("MAP"), "STATUS", 1234567890L, - "cluster50263\\.secondleveldomain\\.com", - -1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits)); - subject.process(new TaskStartedEvent(TaskID - .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType - .valueOf("MAP"), - "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com")); - - final LoggedNetworkTopology topology = subject.build(); - - List racks = topology.getChildren(); - - assertEquals("Wrong number of racks", 2, racks.size()); - - boolean sawSingleton = false; - boolean sawDoubleton = false; - - for (LoggedNetworkTopology rack : racks) { - List nodes = rack.getChildren(); - if (rack.getName().getValue().endsWith(".64")) { - assertEquals("The singleton rack has the wrong number of elements", 1, - nodes.size()); - sawSingleton = true; - } else if (rack.getName().getValue().endsWith(".80")) { - assertEquals("The doubleton rack has the wrong number of elements", 2, - nodes.size()); - sawDoubleton = true; - } else { - assertTrue("Unrecognized rack name", false); - } - } - - assertTrue("Did not see singleton rack", sawSingleton); - assertTrue("Did not see doubleton rack", sawDoubleton); - } - - static private void jsonFileMatchesGold( - Configuration conf, Path result, Path gold, Class clazz, - String fileDescription) throws IOException { - JsonObjectMapperParser goldParser = - new JsonObjectMapperParser(gold, clazz, conf); - JsonObjectMapperParser resultParser = - new JsonObjectMapperParser(result, clazz, conf); - try { - while (true) { - DeepCompare goldJob = goldParser.getNext(); - DeepCompare resultJob = resultParser.getNext(); - if ((goldJob == null) || (resultJob == null)) { - assertTrue(goldJob == resultJob); - break; - } - - try { - resultJob.deepCompare(goldJob, new TreePath(null, "")); - } catch (DeepInequalityException e) { - String error = e.path.toString(); - - assertFalse(fileDescription + " mismatches: " + error, true); - } - } - } finally { - IOUtils.cleanup(null, goldParser, resultParser); - } - } - - /** - * Creates {@link RewindableInputStream} for the given file path. - * @param inputPath the input file path - * @param conf configuration - * @return {@link RewindableInputStream} - * @throws IOException - */ - private RewindableInputStream getRewindableInputStream(Path inputPath, - Configuration conf) throws IOException { - - PossiblyDecompressedInputStream in = - new PossiblyDecompressedInputStream(inputPath, conf); - - return new RewindableInputStream(in, BUFSIZE); - } - - /** - * Allows given history parser to parse the history events and places in - * the given list - * @param parser the job history parser - * @param events the job history events seen while parsing - * @throws IOException - */ - private void getHistoryEvents(JobHistoryParser parser, - ArrayList events, JobBuilder builder) throws IOException { - HistoryEvent e; - while ((e = parser.nextEvent()) != null) { - String eventString = e.getClass().getSimpleName(); - System.out.println(eventString); - events.add(eventString); - if (builder != null) { - builder.process(e); - } - } - } - - /** - * Validate if history events seen are as expected - * @param seenEvents the list of history events seen - * @param goldLinesExpected the expected history events - */ - private void validateSeenHistoryEvents(ArrayList seenEvents, - String[] goldLinesExpected) { - - // Check the output with gold std - assertEquals("Number of events expected is different from the events seen" - + " by the history parser.", - goldLinesExpected.length, seenEvents.size()); - - int index = 0; - for (String goldLine : goldLinesExpected) { - assertEquals("History Event mismatch at line " + (index + 1), - goldLine, seenEvents.get(index)); - index++; - } - } - - final static int BUFSIZE = 8192; // 8K - - // Any Map Reduce Job History Event should be 1 of the following 16 - final static String JSE = "JobSubmittedEvent"; - final static String JPCE = "JobPriorityChangeEvent"; - final static String JSCE = "JobStatusChangedEvent"; - final static String JIE = "JobInitedEvent"; - final static String JICE = "JobInfoChangeEvent"; - static String TSE = "TaskStartedEvent"; - static String ASE = "TaskAttemptStartedEvent"; - static String AFE = "TaskAttemptFinishedEvent"; - static String MFE = "MapAttemptFinishedEvent"; - static String TUE = "TaskUpdatedEvent"; - static String TFE = "TaskFinishedEvent"; - static String JUCE = "JobUnsuccessfulCompletionEvent"; - static String RFE = "ReduceAttemptFinishedEvent"; - static String AUCE = "TaskAttemptUnsuccessfulCompletionEvent"; - static String TFLE = "TaskFailedEvent"; - static String JFE = "JobFinishedEvent"; - - // The expected job history events(in order) when parsing - // the job history file v20-single-input-log.gz - final static String[] goldLines = new String[] { - JSE, JPCE, JSCE, JIE, JICE, TSE, ASE, AFE, MFE, TUE, TFE, JSCE, TSE, - TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, TSE, - TSE, TSE, TSE, TSE, TSE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, - TFE, ASE, AFE, MFE, TUE, TFE, TSE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, - MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, - AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, - ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AUCE, ASE, AFE, - MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, - AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, - ASE, AFE, MFE, TUE, TFE, ASE, AFE, MFE, TUE, TFE, ASE, AFE, RFE, TUE, - TFE, TSE, ASE, AFE, MFE, TUE, TFE, JSCE, JFE - }; - -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java deleted file mode 100644 index 306d1ba486..0000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java +++ /dev/null @@ -1,338 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.tools.rumen; - -import java.util.List; -import java.util.ArrayList; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskStatus.State; -import org.apache.hadoop.mapreduce.TaskType; - -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -public class TestZombieJob { - final double epsilon = 0.01; - private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 }; - private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 }; - private long[] failedCDF = new long[] { 18592, 18592, 18592, 18592, 18592 }; - private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182, - 0.0013027618551328818, 2.605523710265763E-4 }; - - private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L, - 1564L, 1234L }; - private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L, - 1514383L, 139391, 1234L }; - - List loggedJobs = new ArrayList(); - List jobStories = new ArrayList(); - - @Before - public void setUp() throws Exception { - final Configuration conf = new Configuration(); - final FileSystem lfs = FileSystem.getLocal(conf); - - final Path rootInputDir = new Path( - System.getProperty("test.tools.input.dir", "")).makeQualified(lfs); - final Path rootInputFile = new Path(rootInputDir, "rumen/zombie"); - - ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile, - "input-trace.json"), new ZombieCluster(new Path(rootInputFile, - "input-topology.json"), null, conf), conf); - - JobStory job = null; - for (int i = 0; i < 4; i++) { - job = parser.getNextJob(); - ZombieJob zJob = (ZombieJob) job; - LoggedJob loggedJob = zJob.getLoggedJob(); - System.out.println(i + ":" + job.getNumberMaps() + "m, " - + job.getNumberReduces() + "r"); - System.out - .println(loggedJob.getOutcome() + ", " + loggedJob.getJobtype()); - - System.out.println("Input Splits -- " + job.getInputSplits().length - + ", " + job.getNumberMaps()); - - System.out.println("Successful Map CDF -------"); - for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) { - System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() - + "--" + cdf.getMaximum()); - for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) { - System.out.println(" " + ranking.getRelativeRanking() + ":" - + ranking.getDatum()); - } - } - System.out.println("Failed Map CDF -----------"); - for (LoggedDiscreteCDF cdf : loggedJob.getFailedMapAttemptCDFs()) { - System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() - + "--" + cdf.getMaximum()); - for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) { - System.out.println(" " + ranking.getRelativeRanking() + ":" - + ranking.getDatum()); - } - } - System.out.println("Successful Reduce CDF ----"); - LoggedDiscreteCDF cdf = loggedJob.getSuccessfulReduceAttemptCDF(); - System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--" - + cdf.getMaximum()); - for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) { - System.out.println(" " + ranking.getRelativeRanking() + ":" - + ranking.getDatum()); - } - System.out.println("Failed Reduce CDF --------"); - cdf = loggedJob.getFailedReduceAttemptCDF(); - System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--" - + cdf.getMaximum()); - for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) { - System.out.println(" " + ranking.getRelativeRanking() + ":" - + ranking.getDatum()); - } - System.out.print("map attempts to success -- "); - for (double p : loggedJob.getMapperTriesToSucceed()) { - System.out.print(p + ", "); - } - System.out.println(); - System.out.println("==============="); - - loggedJobs.add(loggedJob); - jobStories.add(job); - } - } - - @Test - public void testFirstJob() { - // 20th job seems reasonable: "totalMaps":329,"totalReduces":101 - // successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown - // failed map: 0-0-0-1 - // successful reduce: 99 failed reduce: 13 - // map attempts to success -- 0.9969879518072289, 0.0030120481927710845, - JobStory job = jobStories.get(0); - assertEquals(1, job.getNumberMaps()); - assertEquals(1, job.getNumberReduces()); - - // get splits - - TaskAttemptInfo taInfo = null; - long expectedRuntime = 2423; - // get a succeeded map task attempt, expect the exact same task attempt - taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1); - assertEquals(expectedRuntime, taInfo.getRuntime()); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - // get a succeeded map attempt, but reschedule with different locality. - taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 2); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 0); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - expectedRuntime = 97502; - // get a succeeded reduce task attempt, expect the exact same task attempt - taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - // get a failed reduce task attempt, expect the exact same task attempt - taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - // get a non-exist reduce task attempt, expect a made-up task attempt - // TODO fill in test case - } - - @Test - public void testSecondJob() { - // 7th job has many failed tasks. - // 3204 m, 0 r - // successful maps 497-586-23-1, failed maps 0-0-0-2714 - // map attempts to success -- 0.8113600833767587, 0.18707660239708182, - // 0.0013027618551328818, 2.605523710265763E-4, - JobStory job = jobStories.get(1); - assertEquals(20, job.getNumberMaps()); - assertEquals(1, job.getNumberReduces()); - - TaskAttemptInfo taInfo = null; - // get a succeeded map task attempt - taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 1); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - // get a succeeded map task attempt, with different locality - taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 2); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 0); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - // get a failed map task attempt - taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1); - assertEquals(1927, taInfo.getRuntime()); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - - // get a failed map task attempt, with different locality - // TODO: this test does not make sense here, because I don't have - // available data set. - } - - @Test - public void testFourthJob() { - // 7th job has many failed tasks. - // 3204 m, 0 r - // successful maps 497-586-23-1, failed maps 0-0-0-2714 - // map attempts to success -- 0.8113600833767587, 0.18707660239708182, - // 0.0013027618551328818, 2.605523710265763E-4, - JobStory job = jobStories.get(3); - assertEquals(131, job.getNumberMaps()); - assertEquals(47, job.getNumberReduces()); - - TaskAttemptInfo taInfo = null; - // get a succeeded map task attempt - long runtime = 5268; - taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 1); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - assertEquals(runtime, taInfo.getRuntime()); - - // get a succeeded map task attempt, with different locality - taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 2); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - assertEquals(runtime, taInfo.getRuntime() / 2); - taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 0); - assertEquals(State.SUCCEEDED, taInfo.getRunState()); - assertEquals((long) (runtime / 1.5), taInfo.getRuntime()); - - // get a failed map task attempt - taInfo = job.getMapTaskAttemptInfoAdjusted(113, 0, 1); - assertEquals(18592, taInfo.getRuntime()); - assertEquals(State.FAILED, taInfo.getRunState()); - } - - @Test - public void testRecordIOInfo() { - JobStory job = jobStories.get(3); - - TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113); - - TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0); - - assertEquals(mapTaskCounts[0], mapTask.getInputBytes()); - assertEquals(mapTaskCounts[1], mapTask.getInputRecords()); - assertEquals(mapTaskCounts[2], mapTask.getOutputBytes()); - assertEquals(mapTaskCounts[3], mapTask.getOutputRecords()); - assertEquals(mapTaskCounts[4], mapTask.getTaskMemory()); - - assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes()); - assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords()); - assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes()); - assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords()); - assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory()); - } - - @Test - public void testMakeUpInfo() { - // get many non-exist tasks - // total 3204 map tasks, 3300 is a non-exist task. - checkMakeUpTask(jobStories.get(3), 113, 1); - } - - private void checkMakeUpTask(JobStory job, int taskNumber, int locality) { - TaskAttemptInfo taInfo = null; - - Histogram sampleSucceeded = new Histogram(); - Histogram sampleFailed = new Histogram(); - List sampleAttempts = new ArrayList(); - for (int i = 0; i < 100000; i++) { - int attemptId = 0; - while (true) { - taInfo = job.getMapTaskAttemptInfoAdjusted(taskNumber, attemptId, 1); - if (taInfo.getRunState() == State.SUCCEEDED) { - sampleSucceeded.enter(taInfo.getRuntime()); - break; - } - sampleFailed.enter(taInfo.getRuntime()); - attemptId++; - } - sampleAttempts.add(attemptId); - } - - // check state distribution - int[] countTries = new int[] { 0, 0, 0, 0 }; - for (int attempts : sampleAttempts) { - assertTrue(attempts < 4); - countTries[attempts]++; - } - /* - * System.out.print("Generated map attempts to success -- "); for (int - * count: countTries) { System.out.print((double)count/sampleAttempts.size() - * + ", "); } System.out.println(); System.out.println("==============="); - */ - for (int i = 0; i < 4; i++) { - int count = countTries[i]; - double p = (double) count / sampleAttempts.size(); - assertTrue(expectedPs[i] - p < epsilon); - } - - // check succeeded attempts runtime distribution - long[] expectedCDF = succeededCDF; - LoggedDiscreteCDF cdf = new LoggedDiscreteCDF(); - cdf.setCDF(sampleSucceeded, attemptTimesPercentiles, 100); - /* - * System.out.println("generated succeeded map runtime distribution"); - * System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--" - * + cdf.getMaximum()); for (LoggedSingleRelativeRanking ranking: - * cdf.getRankings()) { System.out.println(" " + - * ranking.getRelativeRanking() + ":" + ranking.getDatum()); } - */ - assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]); - assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]); - for (int i = 0; i < 3; i++) { - LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i); - assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum()); - } - - // check failed attempts runtime distribution - expectedCDF = failedCDF; - cdf = new LoggedDiscreteCDF(); - cdf.setCDF(sampleFailed, attemptTimesPercentiles, 100); - - System.out.println("generated failed map runtime distribution"); - System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--" - + cdf.getMaximum()); - for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) { - System.out.println(" " + ranking.getRelativeRanking() + ":" - + ranking.getDatum()); - } - assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]); - assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]); - for (int i = 0; i < 3; i++) { - LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i); - assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum()); - } - } - - private void assertRuntimeEqual(long expected, long generated) { - if (expected == 0) { - assertTrue(generated > -1000 && generated < 1000); - } else { - long epsilon = Math.max(expected / 10, 5000); - assertTrue(expected - generated > -epsilon); - assertTrue(expected - generated < epsilon); - } - } - -}