diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 67dfeebafe..fdec4fc552 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2099,7 +2099,7 @@ public class YarnConfiguration extends Configuration {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir";
/**
- * Settings for timeline service v2.0
+ * Settings for timeline service v2.0.
*/
public static final String TIMELINE_SERVICE_WRITER_CLASS =
TIMELINE_SERVICE_PREFIX + "writer.class";
@@ -2115,6 +2115,17 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.timelineservice" +
".storage.HBaseTimelineReaderImpl";
+ /**
+ * default schema prefix for hbase tables.
+ */
+ public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX =
+ "prod.";
+
+ /**
+ * config param name to override schema prefix.
+ */
+ public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME =
+ TIMELINE_SERVICE_PREFIX + "hbase-schema.prefix";
/** The setting that controls how often the timeline collector flushes the
* timeline writer.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0823dfe753..3d25722309 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2341,6 +2341,18 @@
259200000
+
+
+ The value of this parameter sets the prefix for all tables that are part of
+ timeline service in the hbase storage schema. It can be set to "dev."
+ or "staging." if it is to be used for development or staging instances.
+ This way the data in production tables stays in a separate set of tables
+ prefixed by "prod.".
+
+ yarn.timeline-service.hbase-schema.prefix
+ prod.
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java
new file mode 100644
index 0000000000..53045e5700
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+/**
+ * Unit tests for checking different schema prefixes.
+ */
+public class TestHBaseTimelineStorageSchema {
+ private static HBaseTestingUtility util;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ util.startMiniCluster();
+ }
+
+ private static void createSchema(Configuration conf) throws IOException {
+ TimelineSchemaCreator.createAllTables(conf, false);
+ }
+
+ @Test
+ public void createWithDefaultPrefix() throws IOException {
+ Configuration hbaseConf = util.getConfiguration();
+ createSchema(hbaseConf);
+ Connection conn = null;
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin();
+
+ TableName entityTableName = BaseTable.getTableName(hbaseConf,
+ EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
+ assertTrue(admin.tableExists(entityTableName));
+ assertTrue(entityTableName.getNameAsString().startsWith(
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX));
+ Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
+ assertNotNull(entityTable);
+
+ TableName flowRunTableName = BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
+ assertTrue(admin.tableExists(flowRunTableName));
+ assertTrue(flowRunTableName.getNameAsString().startsWith(
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX));
+ Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ assertNotNull(flowRunTable);
+ }
+
+ @Test
+ public void createWithSetPrefix() throws IOException {
+ Configuration hbaseConf = util.getConfiguration();
+ String prefix = "unit-test.";
+ hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
+ prefix);
+ createSchema(hbaseConf);
+ Connection conn = null;
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin();
+
+ TableName entityTableName = BaseTable.getTableName(hbaseConf,
+ EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
+ assertTrue(admin.tableExists(entityTableName));
+ assertTrue(entityTableName.getNameAsString().startsWith(prefix));
+ Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
+ assertNotNull(entityTable);
+
+ TableName flowRunTableName = BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
+ assertTrue(admin.tableExists(flowRunTableName));
+ assertTrue(flowRunTableName.getNameAsString().startsWith(prefix));
+ Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ assertNotNull(flowRunTable);
+
+ // create another set with a diff prefix
+ hbaseConf
+ .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME);
+ prefix = "yet-another-unit-test.";
+ hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
+ prefix);
+ createSchema(hbaseConf);
+ entityTableName = BaseTable.getTableName(hbaseConf,
+ EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
+ assertTrue(admin.tableExists(entityTableName));
+ assertTrue(entityTableName.getNameAsString().startsWith(prefix));
+ entityTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME));
+ assertNotNull(entityTable);
+
+ flowRunTableName = BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
+ assertTrue(admin.tableExists(flowRunTableName));
+ assertTrue(flowRunTableName.getNameAsString().startsWith(prefix));
+ flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ assertNotNull(flowRunTable);
+ hbaseConf
+ .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 2778f50df7..97d40fdd18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -30,7 +30,6 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContex
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.junit.AfterClass;
@@ -155,8 +155,9 @@ public class TestHBaseStorageFlowActivity {
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow activity table
- Table table1 = conn.getTable(TableName
- .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(
+ BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
+ FlowActivityTable.DEFAULT_TABLE_NAME));
byte[] startRow =
new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey();
Get g = new Get(startRow);
@@ -286,8 +287,9 @@ public class TestHBaseStorageFlowActivity {
.getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(
+ BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
+ FlowActivityTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
@@ -425,13 +427,13 @@ public class TestHBaseStorageFlowActivity {
new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
s.setStartRow(startRow);
String clusterStop = cluster + "1";
- byte[] stopRow =
- new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow)
- .getRowKey();
+ byte[] stopRow = new FlowActivityRowKey(clusterStop, appCreatedTime, user,
+ flow).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(
+ BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME,
+ FlowActivityTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 7f46a5a0fc..00fee69c52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -94,8 +94,8 @@ public class TestHBaseStorageFlowRun {
@Test
public void checkCoProcessorOff() throws IOException, InterruptedException {
Configuration hbaseConf = util.getConfiguration();
- TableName table = TableName.valueOf(hbaseConf.get(
- FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+ TableName table = BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME);
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin();
@@ -109,14 +109,14 @@ public class TestHBaseStorageFlowRun {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List regions = server.getOnlineRegions(table);
for (Region region : regions) {
- assertTrue(HBaseTimelineStorageUtils.isFlowRunTable(
- region.getRegionInfo(), hbaseConf));
+ assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
+ hbaseConf));
}
}
- table = TableName.valueOf(hbaseConf.get(
+ table = BaseTable.getTableName(hbaseConf,
FlowActivityTable.TABLE_NAME_CONF_NAME,
- FlowActivityTable.DEFAULT_TABLE_NAME));
+ FlowActivityTable.DEFAULT_TABLE_NAME);
if (admin.tableExists(table)) {
// check the regions.
// check in flow activity table
@@ -124,14 +124,13 @@ public class TestHBaseStorageFlowRun {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List regions = server.getOnlineRegions(table);
for (Region region : regions) {
- assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(
- region.getRegionInfo(), hbaseConf));
+ assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
+ hbaseConf));
}
}
- table = TableName.valueOf(hbaseConf.get(
- EntityTable.TABLE_NAME_CONF_NAME,
- EntityTable.DEFAULT_TABLE_NAME));
+ table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME,
+ EntityTable.DEFAULT_TABLE_NAME);
if (admin.tableExists(table)) {
// check the regions.
// check in entity run table
@@ -139,8 +138,8 @@ public class TestHBaseStorageFlowRun {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List regions = server.getOnlineRegions(table);
for (Region region : regions) {
- assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(
- region.getRegionInfo(), hbaseConf));
+ assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
+ hbaseConf));
}
}
}
@@ -220,8 +219,8 @@ public class TestHBaseStorageFlowRun {
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(BaseTable.getTableName(c1,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
@@ -356,24 +355,24 @@ public class TestHBaseStorageFlowRun {
/*
* checks the batch limits on a scan
*/
- void checkFlowRunTableBatchLimit(String cluster, String user,
- String flow, long runid, Configuration c1) throws IOException {
+ void checkFlowRunTableBatchLimit(String cluster, String user, String flow,
+ long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
- byte[] startRow =
- new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
+ byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid)
+ .getRowKey();
s.setStartRow(startRow);
// set a batch limit
int batchLimit = 2;
s.setBatch(batchLimit);
String clusterStop = cluster + "1";
- byte[] stopRow =
- new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
+ byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid)
+ .getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn
- .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(BaseTable.getTableName(c1,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int loopCount = 0;
@@ -517,8 +516,8 @@ public class TestHBaseStorageFlowRun {
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(BaseTable.getTableName(c1,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
@@ -782,8 +781,8 @@ public class TestHBaseStorageFlowRun {
boolean checkMax) throws IOException {
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(BaseTable.getTableName(c1,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index eb18e28243..a4c0e4498e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -53,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
@@ -69,8 +69,8 @@ public class TestHBaseStorageFlowRunCompaction {
private static HBaseTestingUtility util;
- private static final String METRIC_1 = "MAP_SLOT_MILLIS";
- private static final String METRIC_2 = "HDFS_BYTES_READ";
+ private static final String METRIC1 = "MAP_SLOT_MILLIS";
+ private static final String METRIC2 = "HDFS_BYTES_READ";
private final byte[] aRowKey = Bytes.toBytes("a");
private final byte[] aFamily = Bytes.toBytes("family");
@@ -89,8 +89,9 @@ public class TestHBaseStorageFlowRunCompaction {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
- /** Writes non numeric data into flow run table
- * reads it back.
+ /**
+ * writes non numeric data into flow run table.
+ * reads it back
*
* @throws Exception
*/
@@ -106,11 +107,10 @@ public class TestHBaseStorageFlowRunCompaction {
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
valueBytes);
Configuration hbaseConf = util.getConfiguration();
- TableName table = TableName.valueOf(hbaseConf.get(
- FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
- Table flowRunTable = conn.getTable(table);
+ Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
flowRunTable.put(p);
Get g = new Get(rowKeyBytes);
@@ -156,11 +156,10 @@ public class TestHBaseStorageFlowRunCompaction {
value4Bytes);
Configuration hbaseConf = util.getConfiguration();
- TableName table = TableName.valueOf(hbaseConf.get(
- FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
- Table flowRunTable = conn.getTable(table);
+ Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
flowRunTable.put(p);
String rowKey2 = "nonNumericRowKey2";
@@ -262,7 +261,6 @@ public class TestHBaseStorageFlowRunCompaction {
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
// we expect all back in one next call
assertEquals(4, values.size());
- System.out.println(" values size " + values.size() + " " + batchLimit);
rowCount++;
}
// should get back 1 row with each invocation
@@ -321,10 +319,11 @@ public class TestHBaseStorageFlowRunCompaction {
}
// check in flow run table
- HRegionServer server = util.getRSForFirstRegionInTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
- List regions = server.getOnlineRegions(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ HRegionServer server = util.getRSForFirstRegionInTable(
+ BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME,
+ FlowRunTable.DEFAULT_TABLE_NAME));
+ List regions = server.getOnlineRegions(BaseTable.getTableName(c1,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!",
regions.size() > 0);
// flush and compact all the regions of the primary table
@@ -349,8 +348,8 @@ public class TestHBaseStorageFlowRunCompaction {
new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
- Table table1 = conn.getTable(TableName
- .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ Table table1 = conn.getTable(BaseTable.getTableName(c1,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
@@ -364,13 +363,13 @@ public class TestHBaseStorageFlowRunCompaction {
rowCount++;
// check metric1
byte[] q = ColumnHelper.getColumnQualifier(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1);
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1);
assertTrue(values.containsKey(q));
assertEquals(141, Bytes.toLong(values.get(q)));
// check metric2
q = ColumnHelper.getColumnQualifier(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2);
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2);
assertTrue(values.containsKey(q));
assertEquals(57, Bytes.toLong(values.get(q)));
}
@@ -587,9 +586,9 @@ public class TestHBaseStorageFlowRunCompaction {
long cellTsFinalStart = 10001120L;
long cellTsFinal = cellTsFinalStart;
- long cellTsFinalStartNotExpire =
- TimestampGenerator.getSupplementedTimestamp(
- System.currentTimeMillis(), "application_10266666661166_118821");
+ long cellTsFinalStartNotExpire = TimestampGenerator
+ .getSupplementedTimestamp(System.currentTimeMillis(),
+ "application_10266666661166_118821");
long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
long cellTsNotFinalStart = currentTimestamp - 5;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index 8581aa45f2..93d809c003 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Implements behavior common to tables used in the timeline service storage. It
@@ -114,16 +115,42 @@ public abstract class BaseTable {
}
/**
- * Get the table name for this table.
+ * Get the table name for the input table.
*
- * @param hbaseConf HBase configuration from which table name will be fetched.
+ * @param conf HBase configuration from which table name will be fetched.
+ * @param tableName name of the table to be fetched
* @return A {@link TableName} object.
*/
- public TableName getTableName(Configuration hbaseConf) {
- TableName table =
- TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName));
- return table;
+ public static TableName getTableName(Configuration conf, String tableName) {
+ String tableSchemaPrefix = conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX);
+ return TableName.valueOf(tableSchemaPrefix + tableName);
+ }
+ /**
+ * Get the table name for this table.
+ *
+ * @param conf HBase configuration from which table name will be fetched.
+ * @return A {@link TableName} object.
+ */
+ public TableName getTableName(Configuration conf) {
+ String tableName = conf.get(tableNameConfName, defaultTableName);
+ return getTableName(conf, tableName);
+ }
+
+ /**
+ * Get the table name based on the input config parameters.
+ *
+ * @param conf HBase configuration from which table name will be fetched.
+ * @param tableNameInConf the table name parameter in conf.
+ * @param defaultTableName the default table name.
+ * @return A {@link TableName} object.
+ */
+ public static TableName getTableName(Configuration conf,
+ String tableNameInConf, String defaultTableName) {
+ String tableName = conf.get(tableNameInConf, defaultTableName);
+ return getTableName(conf, tableName);
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 221420eb50..a3c355fdee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -72,7 +72,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion();
- isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable(
+ isFlowRunRegion = FlowRunTable.isFlowRunTable(
region.getRegionInfo(), env.getConfiguration());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
index 9c6549ffc1..8fdd68527a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -139,4 +140,23 @@ public class FlowRunTable extends BaseTable {
LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table));
}
+
+ public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
+ Configuration conf) {
+ String regionTableName = hRegionInfo.getTable().getNameAsString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("regionTableName=" + regionTableName);
+ }
+ String flowRunTableName = BaseTable.getTableName(conf,
+ FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)
+ .getNameAsString();
+ if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" table is the flow run table!! "
+ + flowRunTableName);
+ }
+ return true;
+ }
+ return false;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 7f7d6405ae..203d950fa8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -371,5 +371,4 @@ public final class TimelineStorageUtils {
return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long);
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
index 04822c9fb0..6f98dcc53a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
@@ -127,6 +127,7 @@ New configuration parameters that are introduced with v.2 are marked bold.
| **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. |
| **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. |
| **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. |
+| **`yarn.timeline-service.schema.prefix`** | The schema prefix for hbase tables. Defaults to "prod.". |
#### Advanced configuration
@@ -187,8 +188,9 @@ Finally, run the schema creator tool to create the necessary tables:
The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you
are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables
-and continue to create other tables rather than failing the schema creation. When no option or '-help'
-('-h' for short) is provided, the command usage is printed.
+and continue to create other tables rather than failing the schema creation. By default, the tables
+will have a schema prefix of "prod.". When no option or '-help' ('-h' for short) is provided, the
+command usage is printed.
#### Enabling Timeline Service v.2
Following are the basic configurations to start Timeline service v.2: