From 8b042bc1e6ae5e18d435d6a184dec1811cc3a513 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Wed, 21 Dec 2016 09:43:17 -0800 Subject: [PATCH] YARN-5976. Update hbase version to 1.2. Contributed by Vrushali C. --- LICENSE.txt | 8 +- hadoop-project/pom.xml | 26 +- .../pom.xml | 142 +------ ...stPhoenixOfflineAggregationWriterImpl.java | 161 -------- .../pom.xml | 30 +- .../PhoenixOfflineAggregationWriterImpl.java | 358 ------------------ .../storage/TimelineSchemaCreator.java | 22 -- 7 files changed, 27 insertions(+), 720 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java diff --git a/LICENSE.txt b/LICENSE.txt index 2183f0e36d..fd07edfd9b 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1643,12 +1643,6 @@ JLine 0.9.94 leveldbjni-all 1.8 Hamcrest Core 1.3 xmlenc Library 0.52 -StringTemplate 4 4.0.7 -ANTLR 3 Tool 3.5 -ANTLR 3 Runtime 3.5 -ANTLR StringTemplate 3.2.1 -ASM All 5.0.2 -sqlline 1.1.8 -------------------------------------------------------------------------------- Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: @@ -1879,7 +1873,7 @@ the Licensor and You. The binary distribution of this product bundles these dependencies under the following license: -jamon-runtime 2.3.1 +jamon-runtime 2.4.1 -------------------------------------------------------------------------------- MOZILLA PUBLIC LICENSE Version 1.1 diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index c9ee793560..a93529287e 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -49,8 +49,7 @@ 2.11.0 0.8.2.1 - 1.1.3 - 4.7.0-HBase-1.1 + 1.2.4 2.5.1 ${project.version} @@ -1218,29 +1217,6 @@ test tests - - org.apache.phoenix - phoenix-core - ${phoenix.version} - - - - jline - jline - - - joda-time - joda-time - - - - - org.apache.phoenix - phoenix-core - test-jar - ${phoenix.version} - test - org.apache.hbase hbase-it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index 026ef75111..f151e1d6cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -55,10 +55,6 @@ org.apache.hadoop hadoop-common - - org.apache.phoenix - phoenix-core - @@ -79,6 +75,8 @@ + org.apache.hadoop hadoop-auth @@ -116,18 +114,6 @@ - - org.apache.hadoop - hadoop-yarn-server-common - test - - - org.apache.hadoop - hadoop-common - - - - org.apache.hadoop hadoop-yarn-server-applicationhistoryservice @@ -148,14 +134,14 @@ com.sun.jersey - jersey-core + jersey-client test - com.sun.jersey - jersey-client - test + javax.ws.rs + jsr311-api + 1.1.1 @@ -226,23 +212,6 @@ - - org.apache.hbase - hbase-common - tests - test - - - org.apache.hadoop - hadoop-common - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - - org.apache.hbase hbase-server @@ -278,99 +247,6 @@ - - org.apache.hbase - hbase-it - test - tests - - - org.apache.hadoop - hadoop-common - - - org.apache.hadoop - hadoop-auth - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-client - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - - - - org.apache.phoenix - phoenix-core - test - - - org.apache.hadoop - hadoop-common - - - org.apache.hadoop - hadoop-auth - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - net.sourceforge.findbugs - annotations - - - - - - org.apache.phoenix - phoenix-core - test-jar - test - - - org.apache.hadoop - hadoop-common - - - org.apache.hadoop - hadoop-auth - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - net.sourceforge.findbugs - annotations - - - - - - - org.mockito - mockito-all - test - - org.apache.hadoop hadoop-common @@ -385,6 +261,8 @@ + org.apache.hadoop hadoop-hdfs @@ -392,6 +270,8 @@ test + org.apache.hadoop hadoop-hdfs @@ -429,6 +309,8 @@ + org.eclipse.jetty diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java deleted file mode 100644 index e34ae90b12..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.phoenix.hbase.index.write.IndexWriterUtils; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.ReadOnlyProps; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest { - private static PhoenixOfflineAggregationWriterImpl storage; - private static final int BATCH_SIZE = 3; - - @BeforeClass - public static void setup() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - storage = setupPhoenixClusterAndWriterForTest(conf); - } - - @Test(timeout = 90000) - public void testFlowLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION); - } - - @Test(timeout = 90000) - public void testUserLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.USER_AGGREGATION); - } - - @AfterClass - public static void cleanup() throws Exception { - storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); - storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); - tearDownMiniCluster(); - } - - private static PhoenixOfflineAggregationWriterImpl - setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) - throws Exception { - Map props = new HashMap<>(); - // Must update config before starting server - props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - Boolean.FALSE.toString()); - props.put("java.security.krb5.realm", ""); - props.put("java.security.krb5.kdc", ""); - props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, - Boolean.FALSE.toString()); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); - props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); - // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, - Long.toString(BATCH_SIZE)); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - - // Change connection settings for test - conf.set( - YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, - getUrl()); - PhoenixOfflineAggregationWriterImpl - myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); - myWriter.init(conf); - myWriter.start(); - myWriter.createPhoenixTables(); - return myWriter; - } - - private static TimelineEntity getTestAggregationTimelineEntity() { - TimelineEntity entity = new TimelineEntity(); - String id = "hello1"; - String type = "testAggregationType"; - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(1425016501000L); - - TimelineMetric metric = new TimelineMetric(); - metric.setId("HDFS_BYTES_READ"); - metric.addValue(1425016501100L, 8000); - entity.addMetric(metric); - - return entity; - } - - private void testAggregator(OfflineAggregationInfo aggregationInfo) - throws Exception { - // Set up a list of timeline entities and write them back to Phoenix - int numEntity = 1; - TimelineEntities te = new TimelineEntities(); - te.addEntity(getTestAggregationTimelineEntity()); - TimelineCollectorContext context = new TimelineCollectorContext("cluster_1", - "user1", "testFlow", null, 0L, null); - storage.writeAggregatedEntity(context, te, - aggregationInfo); - - // Verify if we're storing all entities - String[] primaryKeyList = aggregationInfo.getPrimaryKeyList(); - String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1] - +") FROM " + aggregationInfo.getTableName(); - verifySQLWithCount(sql, numEntity, "Number of entities should be "); - // Check metric - sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM " - + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) "; - verifySQLWithCount(sql, numEntity, - "Number of entities with info should be "); - } - - - private void verifySQLWithCount(String sql, int targetCount, String message) - throws Exception { - try ( - Statement stmt = - storage.getConnection().createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - assertTrue("Result set empty on statement " + sql, rs.next()); - assertNotNull("Fail to execute query " + sql, rs); - assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); - } catch (SQLException se) { - fail("SQL exception on query: " + sql - + " With exception message: " + se.getLocalizedMessage()); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 6b535c3c22..69c4c7eaa7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -91,8 +91,13 @@ - com.sun.jersey - jersey-core + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind @@ -120,6 +125,12 @@ commons-csv + + javax.ws.rs + jsr311-api + 1.1.1 + + org.apache.hbase hbase-common @@ -181,21 +192,6 @@ - - org.apache.phoenix - phoenix-core - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - net.sourceforge.findbugs - annotations - - - - org.apache.hadoop diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java deleted file mode 100644 index 130cb6cc1f..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java +++ /dev/null @@ -1,358 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.phoenix.util.PropertiesUtil; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -/** - * Offline aggregation Phoenix storage. This storage currently consists of two - * aggregation tables, one for flow level aggregation and one for user level - * aggregation. - * - * Example table record: - * - *
- * |---------------------------|
- * |  Primary   | Column Family|
- * |  key       | metrics      |
- * |---------------------------|
- * | row_key    | metricId1:   |
- * |            | metricValue1 |
- * |            | @timestamp1  |
- * |            |              |
- * |            | metriciD1:   |
- * |            | metricValue2 |
- * |            | @timestamp2  |
- * |            |              |
- * |            | metricId2:   |
- * |            | metricValue1 |
- * |            | @timestamp2  |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |            |              |
- * |---------------------------|
- * 
- * - * For the flow aggregation table, the primary key contains user, cluster, and - * flow id. For user aggregation table,the primary key is user. - * - * Metrics column family stores all aggregated metrics for each record. - */ -@Private -@Unstable -public class PhoenixOfflineAggregationWriterImpl - extends OfflineAggregationWriter { - - private static final Log LOG - = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class); - private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER - = "timeline_cf_placeholder"; - - /** Default Phoenix JDBC driver name. */ - private static final String DRIVER_CLASS_NAME - = "org.apache.phoenix.jdbc.PhoenixDriver"; - - /** Default Phoenix timeline config column family. */ - private static final String METRIC_COLUMN_FAMILY = "m."; - /** Default Phoenix timeline info column family. */ - private static final String INFO_COLUMN_FAMILY = "i."; - /** Default separator for Phoenix storage. */ - private static final String AGGREGATION_STORAGE_SEPARATOR = ";"; - - /** Connection string to the deployed Phoenix cluster. */ - private String connString = null; - private Properties connProperties = new Properties(); - - public PhoenixOfflineAggregationWriterImpl(Properties prop) { - super(PhoenixOfflineAggregationWriterImpl.class.getName()); - connProperties = PropertiesUtil.deepCopy(prop); - } - - public PhoenixOfflineAggregationWriterImpl() { - super(PhoenixOfflineAggregationWriterImpl.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - Class.forName(DRIVER_CLASS_NAME); - // so check it here and only read in the config if it's not overridden. - connString = - conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, - YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT); - super.init(conf); - } - - @Override - public TimelineWriteResponse writeAggregatedEntity( - TimelineCollectorContext context, TimelineEntities entities, - OfflineAggregationInfo info) throws IOException { - TimelineWriteResponse response = new TimelineWriteResponse(); - String sql = "UPSERT INTO " + info.getTableName() - + " (" + StringUtils.join(info.getPrimaryKeyList(), ",") - + ", created_time, metric_names) " - + "VALUES (" - + StringUtils.repeat("?,", info.getPrimaryKeyList().length) - + "?, ?)"; - if (LOG.isDebugEnabled()) { - LOG.debug("TimelineEntity write SQL: " + sql); - } - - try (Connection conn = getConnection(); - PreparedStatement ps = conn.prepareStatement(sql)) { - for (TimelineEntity entity : entities.getEntities()) { - HashMap formattedMetrics = new HashMap<>(); - if (entity.getMetrics() != null) { - for (TimelineMetric m : entity.getMetrics()) { - formattedMetrics.put(m.getId(), m); - } - } - int idx = info.setStringsForPrimaryKey(ps, context, null, 1); - ps.setLong(idx++, entity.getCreatedTime()); - ps.setString(idx++, - StringUtils.join(formattedMetrics.keySet().toArray(), - AGGREGATION_STORAGE_SEPARATOR)); - ps.execute(); - - storeEntityVariableLengthFields(entity, formattedMetrics, context, conn, - info); - - conn.commit(); - } - } catch (SQLException se) { - LOG.error("Failed to add entity to Phoenix " + se.getMessage()); - throw new IOException(se); - } catch (Exception e) { - LOG.error("Exception on getting connection: " + e.getMessage()); - throw new IOException(e); - } - return response; - } - - /** - * Create Phoenix tables for offline aggregation storage if the tables do not - * exist. - * - * @throws IOException if any problem happens while creating Phoenix tables. - */ - public void createPhoenixTables() throws IOException { - // Create tables if necessary - try (Connection conn = getConnection(); - Statement stmt = conn.createStatement()) { - // Table schema defined as in YARN-3817. - String sql = "CREATE TABLE IF NOT EXISTS " - + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "flow_name VARCHAR NOT NULL, " - + "created_time UNSIGNED_LONG, " - + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER - + " VARBINARY, " - + "metric_names VARCHAR, info_keys VARCHAR " - + "CONSTRAINT pk PRIMARY KEY(" - + "user, cluster, flow_name))"; - stmt.executeUpdate(sql); - sql = "CREATE TABLE IF NOT EXISTS " - + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "created_time UNSIGNED_LONG, " - + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER - + " VARBINARY, " - + "metric_names VARCHAR, info_keys VARCHAR " - + "CONSTRAINT pk PRIMARY KEY(user, cluster))"; - stmt.executeUpdate(sql); - conn.commit(); - } catch (SQLException se) { - LOG.error("Failed in init data " + se.getLocalizedMessage()); - throw new IOException(se); - } - return; - } - - // Utility functions - @Private - @VisibleForTesting - Connection getConnection() throws IOException { - Connection conn; - try { - conn = DriverManager.getConnection(connString, connProperties); - conn.setAutoCommit(false); - } catch (SQLException se) { - LOG.error("Failed to connect to phoenix server! " - + se.getLocalizedMessage()); - throw new IOException(se); - } - return conn; - } - - // WARNING: This method will permanently drop a table! - @Private - @VisibleForTesting - void dropTable(String tableName) throws Exception { - try (Connection conn = getConnection(); - Statement stmt = conn.createStatement()) { - String sql = "DROP TABLE " + tableName; - stmt.executeUpdate(sql); - } catch (SQLException se) { - LOG.error("Failed in dropping entity table " + se.getLocalizedMessage()); - throw se; - } - } - - private static class DynamicColumns { - static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY"; - static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR"; - private String columnFamilyPrefix; - private String type; - private Set columns; - - public DynamicColumns(String columnFamilyPrefix, String type, - Set keyValues) { - this.columnFamilyPrefix = columnFamilyPrefix; - this.columns = keyValues; - this.type = type; - } - } - - private static StringBuilder appendColumnsSQL( - StringBuilder colNames, DynamicColumns cfInfo) { - // Prepare the sql template by iterating through all keys - for (K key : cfInfo.columns) { - colNames.append(",").append(cfInfo.columnFamilyPrefix) - .append(key.toString()).append(cfInfo.type); - } - return colNames; - } - - private static int setValuesForColumnFamily( - PreparedStatement ps, Map keyValues, int startPos, - boolean converToBytes) throws SQLException { - int idx = startPos; - for (Map.Entry entry : keyValues.entrySet()) { - V value = entry.getValue(); - if (value instanceof Collection) { - ps.setString(idx++, StringUtils.join( - (Collection) value, AGGREGATION_STORAGE_SEPARATOR)); - } else { - if (converToBytes) { - try { - ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue())); - } catch (IOException ie) { - LOG.error("Exception in converting values into bytes " - + ie.getMessage()); - throw new SQLException(ie); - } - } else { - ps.setString(idx++, value.toString()); - } - } - } - return idx; - } - - private static int setBytesForColumnFamily( - PreparedStatement ps, Map keyValues, int startPos) - throws SQLException { - return setValuesForColumnFamily(ps, keyValues, startPos, true); - } - - private static int setStringsForColumnFamily( - PreparedStatement ps, Map keyValues, int startPos) - throws SQLException { - return setValuesForColumnFamily(ps, keyValues, startPos, false); - } - - private static void storeEntityVariableLengthFields(TimelineEntity entity, - Map formattedMetrics, - TimelineCollectorContext context, Connection conn, - OfflineAggregationInfo aggregationInfo) throws SQLException { - int numPlaceholders = 0; - StringBuilder columnDefs = new StringBuilder( - StringUtils.join(aggregationInfo.getPrimaryKeyList(), ",")); - if (formattedMetrics != null && formattedMetrics.size() > 0) { - appendColumnsSQL(columnDefs, new DynamicColumns<>( - METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES, - formattedMetrics.keySet())); - numPlaceholders += formattedMetrics.keySet().size(); - } - if (numPlaceholders == 0) { - return; - } - StringBuilder placeholders = new StringBuilder(); - placeholders.append( - StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length)); - // numPlaceholders >= 1 now - placeholders.append("?") - .append(StringUtils.repeat(",?", numPlaceholders - 1)); - String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ") - .append(aggregationInfo.getTableName()).append(" (").append(columnDefs) - .append(") VALUES(").append(placeholders).append(")").toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL statement for variable length fields: " - + sqlVariableLengthFields); - } - // Use try with resource statement for the prepared statement - try (PreparedStatement psVariableLengthFields = - conn.prepareStatement(sqlVariableLengthFields)) { - int idx = aggregationInfo.setStringsForPrimaryKey( - psVariableLengthFields, context, null, 1); - if (formattedMetrics != null && formattedMetrics.size() > 0) { - idx = setBytesForColumnFamily( - psVariableLengthFields, formattedMetrics, idx); - } - psVariableLengthFields.execute(); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index 33f5449fae..9369d6af5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -59,7 +59,6 @@ private TimelineSchemaCreator() { final static String NAME = TimelineSchemaCreator.class.getSimpleName(); private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); - private static final String PHOENIX_OPTION_SHORT = "p"; private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; private static final String APP_TABLE_NAME_SHORT = "a"; private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; @@ -116,22 +115,6 @@ public static void main(String[] args) throws Exception { exceptions.add(e); } - // Create Phoenix data schema if needed - if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) { - Configuration phoenixConf = new Configuration(); - try { - PhoenixOfflineAggregationWriterImpl phoenixWriter = - new PhoenixOfflineAggregationWriterImpl(); - phoenixWriter.init(phoenixConf); - phoenixWriter.start(); - phoenixWriter.createPhoenixTables(); - phoenixWriter.stop(); - LOG.info("Successfully created Phoenix offline aggregation schema. "); - } catch (IOException e) { - LOG.error("Error in creating phoenix tables: " + e.getMessage()); - exceptions.add(e); - } - } if (exceptions.size() > 0) { LOG.warn("Schema creation finished with the following exceptions"); for (Exception e : exceptions) { @@ -181,11 +164,6 @@ private static CommandLine parseArgs(String[] args) throws ParseException { // Options without an argument // No need to set arg name since we do not need an argument here - o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false, - "create Phoenix offline aggregation tables"); - o.setRequired(false); - options.addOption(o); - o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", false, "skip existing Hbase tables and continue to create new tables"); o.setRequired(false);