From 51d092faef7af91fd315d0f0e589f63cbb48592c Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Tue, 12 May 2015 13:53:38 -0700 Subject: [PATCH] YARN-3529. Added mini HBase cluster and Phoenix support to timeline service v2 unit tests. Contributed by Li Lu. --- hadoop-project/pom.xml | 64 ++++++++++++++++-- .../pom.xml | 47 ++++++++++---- .../storage/PhoenixTimelineWriterImpl.java | 22 ++++++- .../TestPhoenixTimelineWriterImpl.java | 65 +++++++++++++------ 4 files changed, 159 insertions(+), 39 deletions(-) diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index f97a882724..12043e2532 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -49,6 +49,8 @@ 2.11.0 0.8.2.1 + 1.0.1 + 4.5.0-SNAPSHOT ${project.version} 1.0.13 @@ -1042,11 +1044,63 @@ - - org.skyscreamer - jsonassert - 1.3.0 - + + org.skyscreamer + jsonassert + 1.3.0 + + + org.apache.hbase + hbase-client + ${hbase.version} + + + org.apache.phoenix + phoenix-core + ${phoenix.version} + + + + jline + jline + + + joda-time + joda-time + + + + + org.apache.phoenix + phoenix-core + test-jar + ${phoenix.version} + test + + + org.apache.hbase + hbase-it + ${hbase.version} + test + tests + + + org.apache.hbase + hbase-testing-util + ${hbase.version} + test + true + + + org.jruby + jruby-complete + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.kerby diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 18618030de..919b711db4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -122,21 +122,44 @@ - org.apache.phoenix - phoenix-core - 4.3.0 - - - - jline - jline - - + org.apache.phoenix + phoenix-core - com.google.guava - guava + org.apache.hbase + hbase-client + + + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs + test-jar + test + + + org.apache.phoenix + phoenix-core + test-jar + test + + + org.apache.hbase + hbase-it + test + tests + + + org.apache.hbase + hbase-testing-util + test + true + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java index af8a233cfd..5b4442ceb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; +import java.util.Properties; import java.util.Set; @Private @@ -50,6 +52,13 @@ public class PhoenixTimelineWriterImpl extends AbstractService implements TimelineWriter { + public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + + "writer.phoenix.connectionString"; + + public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT + = "jdbc:phoenix:localhost:2181:/hbase"; + private static final Log LOG = LogFactory.getLog(PhoenixTimelineWriterImpl.class); private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER @@ -90,7 +99,10 @@ public class PhoenixTimelineWriterImpl extends AbstractService private static final String PHOENIX_STORAGE_SEPARATOR = ";"; /** Connection string to the deployed Phoenix cluster */ - static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase"; + @VisibleForTesting + String connString = null; + @VisibleForTesting + Properties connProperties = new Properties(); PhoenixTimelineWriterImpl() { super((PhoenixTimelineWriterImpl.class.getName())); @@ -98,6 +110,10 @@ public class PhoenixTimelineWriterImpl extends AbstractService @Override protected void serviceInit(Configuration conf) throws Exception { + // so check it here and only read in the config if it's not overridden. + connString = + conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR, + TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT); createTables(); super.init(conf); } @@ -174,11 +190,11 @@ public TimelineWriteResponse aggregate(TimelineEntity data, // Utility functions @Private @VisibleForTesting - static Connection getConnection() throws IOException { + Connection getConnection() throws IOException { Connection conn; try { Class.forName(DRIVER_CLASS_NAME); - conn = DriverManager.getConnection(CONN_STRING); + conn = DriverManager.getConnection(connString, connProperties); conn.setAutoCommit(false); } catch (SQLException se) { LOG.error("Failed to connect to phoenix server! " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java index a55893eda8..dece83d35f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java @@ -23,30 +23,37 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; -public class TestPhoenixTimelineWriterImpl { - private PhoenixTimelineWriterImpl writer; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - @Before - public void setup() throws Exception { - // TODO: launch a miniphoenix cluster, or else we're directly operating on - // the active Phoenix cluster +public class TestPhoenixTimelineWriterImpl extends BaseTest { + private static PhoenixTimelineWriterImpl writer; + private static final int BATCH_SIZE = 3; + + @BeforeClass + public static void setup() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - writer = createPhoenixWriter(conf); + writer = setupPhoenixClusterAndWriterForTest(conf); } - @Ignore - @Test + @Test(timeout = 90000) public void testPhoenixWriterBasic() throws Exception { // Set up a list of timeline entities and write them back to Phoenix int numEntity = 12; @@ -91,28 +98,48 @@ public void testPhoenixWriterBasic() throws Exception { verifySQLWithCount(sql, (numEntity / 4), "Number of events should be "); } - @After - public void cleanup() throws Exception { - // Note: it is assumed that we're working on a test only cluster, or else - // this cleanup process will drop the entity table. + @AfterClass + public static void cleanup() throws Exception { writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME); writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME); writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME); writer.serviceStop(); + tearDownMiniCluster(); } - private static PhoenixTimelineWriterImpl createPhoenixWriter( + private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest( YarnConfiguration conf) throws Exception{ + Map props = new HashMap<>(); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + Boolean.FALSE.toString()); + props.put("java.security.krb5.realm", ""); + props.put("java.security.krb5.kdc", ""); + props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, + Boolean.FALSE.toString()); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); + props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); + // Make a small batch size to test multiple calls to reserve sequences + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, + Long.toString(BATCH_SIZE)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl(); + // Change connection settings for test + conf.set( + PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR, + getUrl()); + myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES); myWriter.serviceInit(conf); return myWriter; } private void verifySQLWithCount(String sql, int targetCount, String message) - throws Exception{ + throws Exception { try ( Statement stmt = - PhoenixTimelineWriterImpl.getConnection().createStatement(); + writer.getConnection().createStatement(); ResultSet rs = stmt.executeQuery(sql)) { assertTrue("Result set empty on statement " + sql, rs.next()); assertNotNull("Fail to execute query " + sql, rs);