YARN-3529. Added mini HBase cluster and Phoenix support to timeline service v2 unit tests. Contributed by Li Lu.

This commit is contained in:
Zhijie Shen 2015-05-12 13:53:38 -07:00 committed by Sangjin Lee
parent 41fb5c7381
commit 51d092faef
4 changed files with 159 additions and 39 deletions

View File

@ -49,6 +49,8 @@
<xerces.jdiff.version>2.11.0</xerces.jdiff.version> <xerces.jdiff.version>2.11.0</xerces.jdiff.version>
<kafka.version>0.8.2.1</kafka.version> <kafka.version>0.8.2.1</kafka.version>
<hbase.version>1.0.1</hbase.version>
<phoenix.version>4.5.0-SNAPSHOT</phoenix.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
<commons-daemon.version>1.0.13</commons-daemon.version> <commons-daemon.version>1.0.13</commons-daemon.version>
@ -1047,6 +1049,58 @@
<artifactId>jsonassert</artifactId> <artifactId>jsonassert</artifactId>
<version>1.3.0</version> <version>1.3.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
<exclusions>
<!-- Exclude jline from here -->
<exclusion>
<artifactId>jline</artifactId>
<groupId>jline</groupId>
</exclusion>
<exclusion>
<artifactId>joda-time</artifactId>
<groupId>joda-time</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<type>test-jar</type>
<version>${phoenix.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.kerby</groupId> <groupId>org.apache.kerby</groupId>

View File

@ -124,19 +124,42 @@
<dependency> <dependency>
<groupId>org.apache.phoenix</groupId> <groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId> <artifactId>phoenix-core</artifactId>
<version>4.3.0</version>
<exclusions>
<!-- Exclude jline from here -->
<exclusion>
<artifactId>jline</artifactId>
<groupId>jline</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>guava</artifactId> <artifactId>hbase-client</artifactId>
</dependency> </dependency>
<!-- for unit tests only -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<scope>test</scope>
<optional>true</optional>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
@ -43,6 +44,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set; import java.util.Set;
@Private @Private
@ -50,6 +52,13 @@
public class PhoenixTimelineWriterImpl extends AbstractService public class PhoenixTimelineWriterImpl extends AbstractService
implements TimelineWriter { implements TimelineWriter {
public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR
= YarnConfiguration.TIMELINE_SERVICE_PREFIX
+ "writer.phoenix.connectionString";
public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT
= "jdbc:phoenix:localhost:2181:/hbase";
private static final Log LOG private static final Log LOG
= LogFactory.getLog(PhoenixTimelineWriterImpl.class); = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
@ -90,7 +99,10 @@ public class PhoenixTimelineWriterImpl extends AbstractService
private static final String PHOENIX_STORAGE_SEPARATOR = ";"; private static final String PHOENIX_STORAGE_SEPARATOR = ";";
/** Connection string to the deployed Phoenix cluster */ /** Connection string to the deployed Phoenix cluster */
static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase"; @VisibleForTesting
String connString = null;
@VisibleForTesting
Properties connProperties = new Properties();
PhoenixTimelineWriterImpl() { PhoenixTimelineWriterImpl() {
super((PhoenixTimelineWriterImpl.class.getName())); super((PhoenixTimelineWriterImpl.class.getName()));
@ -98,6 +110,10 @@ public class PhoenixTimelineWriterImpl extends AbstractService
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
// so check it here and only read in the config if it's not overridden.
connString =
conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT);
createTables(); createTables();
super.init(conf); super.init(conf);
} }
@ -174,11 +190,11 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
// Utility functions // Utility functions
@Private @Private
@VisibleForTesting @VisibleForTesting
static Connection getConnection() throws IOException { Connection getConnection() throws IOException {
Connection conn; Connection conn;
try { try {
Class.forName(DRIVER_CLASS_NAME); Class.forName(DRIVER_CLASS_NAME);
conn = DriverManager.getConnection(CONN_STRING); conn = DriverManager.getConnection(connString, connProperties);
conn.setAutoCommit(false); conn.setAutoCommit(false);
} catch (SQLException se) { } catch (SQLException se) {
LOG.error("Failed to connect to phoenix server! " LOG.error("Failed to connect to phoenix server! "

View File

@ -23,30 +23,37 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After; import org.junit.AfterClass;
import org.junit.Before; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class TestPhoenixTimelineWriterImpl { import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
private PhoenixTimelineWriterImpl writer;
@Before public class TestPhoenixTimelineWriterImpl extends BaseTest {
public void setup() throws Exception { private static PhoenixTimelineWriterImpl writer;
// TODO: launch a miniphoenix cluster, or else we're directly operating on private static final int BATCH_SIZE = 3;
// the active Phoenix cluster
@BeforeClass
public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
writer = createPhoenixWriter(conf); writer = setupPhoenixClusterAndWriterForTest(conf);
} }
@Ignore @Test(timeout = 90000)
@Test
public void testPhoenixWriterBasic() throws Exception { public void testPhoenixWriterBasic() throws Exception {
// Set up a list of timeline entities and write them back to Phoenix // Set up a list of timeline entities and write them back to Phoenix
int numEntity = 12; int numEntity = 12;
@ -91,19 +98,39 @@ public void testPhoenixWriterBasic() throws Exception {
verifySQLWithCount(sql, (numEntity / 4), "Number of events should be "); verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
} }
@After @AfterClass
public void cleanup() throws Exception { public static void cleanup() throws Exception {
// Note: it is assumed that we're working on a test only cluster, or else
// this cleanup process will drop the entity table.
writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME); writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME); writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME); writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
writer.serviceStop(); writer.serviceStop();
tearDownMiniCluster();
} }
private static PhoenixTimelineWriterImpl createPhoenixWriter( private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest(
YarnConfiguration conf) throws Exception{ YarnConfiguration conf) throws Exception{
Map<String, String> props = new HashMap<>();
// Must update config before starting server
props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
Boolean.FALSE.toString());
props.put("java.security.krb5.realm", "");
props.put("java.security.krb5.kdc", "");
props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
Boolean.FALSE.toString());
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
// Make a small batch size to test multiple calls to reserve sequences
props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
Long.toString(BATCH_SIZE));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl(); PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
// Change connection settings for test
conf.set(
PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
getUrl());
myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES);
myWriter.serviceInit(conf); myWriter.serviceInit(conf);
return myWriter; return myWriter;
} }
@ -112,7 +139,7 @@ private void verifySQLWithCount(String sql, int targetCount, String message)
throws Exception { throws Exception {
try ( try (
Statement stmt = Statement stmt =
PhoenixTimelineWriterImpl.getConnection().createStatement(); writer.getConnection().createStatement();
ResultSet rs = stmt.executeQuery(sql)) { ResultSet rs = stmt.executeQuery(sql)) {
assertTrue("Result set empty on statement " + sql, rs.next()); assertTrue("Result set empty on statement " + sql, rs.next());
assertNotNull("Fail to execute query " + sql, rs); assertNotNull("Fail to execute query " + sql, rs);