YARN-7346. Add a profile to allow optional compilation for ATSv2 with HBase-2.0. Contributed by Haibo Chen and Rohith.

This commit is contained in:
Rohith Sharma K S 2018-03-06 10:42:03 +05:30
parent 745190ecdc
commit 55ba49dd07
25 changed files with 2086 additions and 231 deletions

View File

@ -236,7 +236,7 @@
</moduleSet>
<moduleSet>
<includes>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include>
<include>org.apache.hadoop:${hbase-server-artifactid}</include>
</includes>
<binaries>
<outputDirectory>share/hadoop/${hadoop.component}/timelineservice</outputDirectory>

View File

@ -49,9 +49,6 @@
<xerces.jdiff.version>2.11.0</xerces.jdiff.version>
<kafka.version>0.8.2.1</kafka.version>
<hbase.version>1.2.6</hbase.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
<hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
<commons-daemon.version>1.0.13</commons-daemon.version>
@ -146,6 +143,8 @@
<swagger-annotations-version>1.5.4</swagger-annotations-version>
<snakeyaml.version>1.16</snakeyaml.version>
<hbase.one.version>1.2.6</hbase.one.version>
<hbase.two.version>2.0.0-beta-1</hbase.two.version>
</properties>
<dependencyManagement>
@ -407,12 +406,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>
@ -666,7 +659,6 @@
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.servlet</artifactId>
@ -1371,6 +1363,11 @@
<version>3.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jruby.jcodings</groupId>
<artifactId>jcodings</artifactId>
<version>1.0.13</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -1839,6 +1836,59 @@
</plugins>
</build>
</profile>
<!-- The profile for building against HBase 1.2.x
This is the default.
-->
<profile>
<id>hbase1</id>
<activation>
<property>
<name>!hbase.profile</name>
</property>
</activation>
<properties>
<hbase.version>${hbase.one.version}</hbase.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
<hbase-compatible-guava.version>12.0.1</hbase-compatible-guava.version>
<hbase-server-artifactid>hadoop-yarn-server-timelineservice-hbase-server-1</hbase-server-artifactid>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hbase-server-artifactid}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
<!-- The profile for building against HBase 2.0.0.
Activate using: mvn -Dhbase.profile=2.0
-->
<profile>
<id>hbase2</id>
<activation>
<property>
<name>hbase.profile</name>
<value>2.0</value>
</property>
</activation>
<properties>
<hbase.version>${hbase.two.version}</hbase.version>
<hbase-compatible-hadoop.version>3.0.0</hbase-compatible-hadoop.version>
<hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version>
<hbase-server-artifactid>hadoop-yarn-server-timelineservice-hbase-server-2</hbase-server-artifactid>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hbase-server-artifactid}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
</profiles>
<repositories>

View File

@ -27,7 +27,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-yarn-server-timelineservice-hbase-tests</artifactId>
<version>3.2.0-SNAPSHOT</version>
<name>Apache Hadoop YARN Timeline Service HBase tests</name>
<name>Apache Hadoop YARN TimelineService HBase tests</name>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
@ -82,18 +82,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@ -414,4 +402,68 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hbase1</id>
<activation>
<property>
<name>!hbase.profile</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<profile>
<id>hbase2</id>
<activation>
<property>
<name>hbase.profile</name>
<value>2.0</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-2</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this direct
dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hbase-compatible-hadoop.version}</version>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this direct
dependency -->
<!-- This is needed by HBaseTestingUtility -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -25,7 +25,6 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -41,7 +40,6 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@ -64,6 +62,7 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -89,7 +88,7 @@ public static void setupBeforeClass() throws Exception {
}
@Test
public void checkCoProcessorOff() throws IOException, InterruptedException {
public void checkCoProcessorOff() throws Exception, InterruptedException {
Configuration hbaseConf = util.getConfiguration();
TableName table = BaseTableRW.getTableName(hbaseConf,
FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME);
@ -127,19 +126,9 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
}
private void checkCoprocessorExists(TableName table, boolean exists)
throws IOException, InterruptedException {
throws Exception {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
boolean found = false;
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
if (coprocName.contains("FlowRunCoprocessor")) {
found = true;
}
}
assertEquals(found, exists);
}
HBaseTimelineServerUtils.validateFlowRunCoprocessor(server, table, exists);
}
/**

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -44,9 +45,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@ -327,20 +326,15 @@ public void testWriteFlowRunCompaction() throws Exception {
}
// check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(
BaseTableRW.getTableName(c1, FlowRunTableRW.TABLE_NAME_CONF_NAME,
FlowRunTableRW.DEFAULT_TABLE_NAME));
List<Region> regions = server.getOnlineRegions(
BaseTableRW.getTableName(c1,
FlowRunTableRW.TABLE_NAME_CONF_NAME,
FlowRunTableRW.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!",
regions.size() > 0);
TableName flowRunTable = BaseTableRW.getTableName(c1,
FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME);
HRegionServer server = util.getRSForFirstRegionInTable(flowRunTable);
// flush and compact all the regions of the primary table
for (Region region : regions) {
region.flush(true);
region.compact(true);
}
int regionNum = HBaseTimelineServerUtils.flushCompactTableRegions(
server, flowRunTable);
assertTrue("Didn't find any regions for primary table!",
regionNum > 0);
// check flow run for one flow many apps
checkFlowRunTable(cluster, user, flow, runid, c1, 4);
@ -392,13 +386,10 @@ private void checkFlowRunTable(String cluster, String user, String flow,
private FlowScanner getFlowScannerForTestingCompaction() {
// create a FlowScanner object with the sole purpose of invoking a process
// summation;
CompactionRequest request = new CompactionRequest();
request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, null,
(request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION));
FlowScannerOperation.MAJOR_COMPACTION);
assertNotNull(fs);
return fs;
}
@ -423,40 +414,45 @@ public void checkProcessSummationMoreCellsSumFinal2()
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_91188");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_12700000001_29102");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_191780000000001_8195");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_191780000000001_98104");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
@ -523,10 +519,12 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// insert SUM_FINAL cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@ -537,10 +535,12 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// add SUM cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@ -614,10 +614,12 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will expire
for (int i = 0; i < countFinal; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@ -628,10 +630,12 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will NOT expire
for (int i = 0; i < countFinalNotExpire; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
@ -642,10 +646,12 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// add SUM cells
for (int i = 0; i < countNotFinal; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@ -697,10 +703,12 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
long cellValue2 = 28L;
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
@ -709,10 +717,11 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
currentColumnCells.add(c1);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_100000000001_119101");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
@ -755,10 +764,12 @@ public void testProcessSummationOneCellSumFinal() throws IOException {
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123458888888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
@ -793,10 +804,12 @@ public void testProcessSummationOneCell() throws IOException {
// try for 1 cell with tag SUM
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_123458888888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);

View File

@ -40,8 +40,8 @@
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
@ -93,7 +93,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<artifactId>hadoop-yarn-server-common</artifactId>
<scope>provided</scope>
</dependency>

View File

@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
<name>Apache Hadoop YARN TimelineService HBase Server 1.2</name>
<version>3.2.0-SNAPSHOT</version>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.parent.parent.basedir}</yarn.basedir>
<!--Needed while compiling individual module-->
<hbase.version>${hbase.one.version}</hbase.version>
</properties>
<profiles>
<profile>
<id>hbase1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptor>src/assembly/coprocessor.xml</descriptor>
<attach>true</attach>
</configuration>
<executions>
<execution>
<id>create-coprocessor-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -16,7 +16,8 @@
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>coprocessor</id>
<formats>
<format>jar</format>
@ -30,7 +31,7 @@
<scope>runtime</scope>
<includes>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1</include>
</includes>
</dependencySet>
</dependencySets>

View File

@ -20,7 +20,10 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
@ -28,6 +31,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A utility class used by hbase-server module.
@ -49,7 +53,7 @@ public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
Tag t = createTag(aggOp.getTagType(), attribute.getValue());
return t;
}
@ -57,7 +61,7 @@ public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
@ -96,6 +100,45 @@ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
newValue, tags);
}
/**
* Create a Tag.
* @param tagType tag type
* @param tag the content of the tag in byte array.
* @return an instance of Tag
*/
public static Tag createTag(byte tagType, byte[] tag) {
return new Tag(tagType, tag);
}
/**
* Create a Tag.
* @param tagType tag type
* @param tag the content of the tag in String.
* @return an instance of Tag
*/
public static Tag createTag(byte tagType, String tag) {
return createTag(tagType, Bytes.toBytes(tag));
}
/**
* Convert a cell to a list of tags.
* @param cell the cell to convert
* @return a list of tags
*/
public static List<Tag> convertCellAsTagList(Cell cell) {
return Tag.asList(
cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
}
/**
* Convert a list of tags to a byte array.
* @param tags the list of tags to convert
* @return byte array representation of the list of tags
*/
public static byte[] convertTagListToByteArray(List<Tag> tags) {
return Tag.fromList(tags);
}
/**
* returns app id from the list of tags.
*
@ -132,4 +175,48 @@ public static AggregationOperation getAggregationOperationFromTagsList(
}
return null;
}
// flush and compact all the regions of the primary table
/**
* Flush and compact all regions of a table.
* @param server region server
* @param table the table to flush and compact
* @return the number of regions flushed and compacted
*/
public static int flushCompactTableRegions(HRegionServer server,
TableName table) throws IOException {
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
region.flush(true);
region.compact(true);
}
return regions.size();
}
/**
* Check the existence of FlowRunCoprocessor in a table.
* @param server region server
* @param table table to check
* @param existenceExpected true if the FlowRunCoprocessor is expected
* to be loaded in the table, false otherwise
* @throws Exception
*/
public static void validateFlowRunCoprocessor(HRegionServer server,
TableName table, boolean existenceExpected) throws Exception {
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
boolean found = false;
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
if (coprocName.contains("FlowRunCoprocessor")) {
found = true;
}
}
if (found != existenceExpected) {
throw new Exception("FlowRunCoprocessor is" +
(existenceExpected ? " not " : " ") + "loaded in table " + table);
}
}
}
}

View File

@ -102,7 +102,8 @@ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
tags.add(t);
}
}
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()

View File

@ -247,8 +247,7 @@ private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
}
private AggregationOperation getCurrentAggOp(Cell cell) {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
// We assume that all the operations for a particular column are the same
return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
}
@ -316,8 +315,7 @@ private void collectCells(SortedSet<Cell> currentColumnCells,
}
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
String aggDim = HBaseTimelineServerUtils
.getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
@ -454,8 +452,7 @@ List<Cell> processSummationMajorCompaction(
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
String appId = HBaseTimelineServerUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
@ -491,13 +488,16 @@ List<Cell> processSummationMajorCompaction(
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List<Tag> tags = new ArrayList<Tag>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
Cell sumCell = HBaseTimelineServerUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),

View File

@ -0,0 +1,172 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-2</artifactId>
<name>Apache Hadoop YARN TimelineService HBase Server 2.0</name>
<version>3.2.0-SNAPSHOT</version>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.parent.parent.basedir}</yarn.basedir>
<!--Needed while compiling individual module-->
<hbase.version>${hbase.two.version}</hbase.version>
</properties>
<profiles>
<profile>
<id>hbase2</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- This is to work around the version divergence of
org.jruby.jcodings:jcodings pulled in by hbase-client -->
<dependency>
<groupId>org.jruby.jcodings</groupId>
<artifactId>jcodings</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>create-coprocessor-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptor>src/assembly/coprocessor.xml</descriptor>
<attach>true</attach>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,38 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.01
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>coprocessor</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<includes>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-2</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A utility class used by hbase-server module.
*/
public final class HBaseTimelineServerUtils {
private HBaseTimelineServerUtils() {
}
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute Attribute from which tag has to be fetched.
* @return a HBase Tag.
*/
public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
// an Aggregation Dimension
// Get the Tag type from either
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = createTag(aggOp.getTagType(), attribute.getValue());
return t;
}
AggregationCompactionDimension aggCompactDim =
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
}
/**
* creates a new cell based on the input cell but with the new value.
*
* @param origCell Original cell
* @param newValue new cell value
* @return cell
* @throws IOException while creating new cell.
*/
public static Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
/**
* creates a cell with the given inputs.
*
* @param row row of the cell to be created
* @param family column family name of the new cell
* @param qualifier qualifier for the new cell
* @param ts timestamp of the new cell
* @param newValue value of the new cell
* @param tags tags in the new cell
* @return cell
* @throws IOException while creating the cell.
*/
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
long ts, byte[] newValue, byte[] tags) throws IOException {
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
newValue, tags);
}
/**
* Create a Tag.
* @param tagType tag type
* @param tag the content of the tag in byte array.
* @return an instance of Tag
*/
public static Tag createTag(byte tagType, byte[] tag) {
return new ArrayBackedTag(tagType, tag);
}
/**
* Create a Tag.
* @param tagType tag type
* @param tag the content of the tag in String.
* @return an instance of Tag
*/
public static Tag createTag(byte tagType, String tag) {
return createTag(tagType, Bytes.toBytes(tag));
}
/**
* Convert a cell to a list of tags.
* @param cell the cell to convert
* @return a list of tags
*/
public static List<Tag> convertCellAsTagList(Cell cell) {
return TagUtil.asList(
cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
}
/**
* Convert a list of tags to a byte array.
* @param tags the list of tags to convert
* @return byte array representation of the list of tags
*/
public static byte[] convertTagListToByteArray(List<Tag> tags) {
return TagUtil.fromList(tags);
}
/**
* returns app id from the list of tags.
*
* @param tags cell tags to be looked into
* @return App Id as the AggregationCompactionDimension
*/
public static String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(Tag.cloneValue(t));
return appId;
}
}
return appId;
}
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise.
*
* @param tags list of HBase tags.
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
List<Tag> tags) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
for (Tag tag : tags) {
if (tag.getType() == aggOp.getTagType()) {
return aggOp;
}
}
}
return null;
}
// flush and compact all the regions of the primary table
/**
* Flush and compact all regions of a table.
* @param server region server
* @param table the table to flush and compact
* @return the number of regions flushed and compacted
*/
public static int flushCompactTableRegions(HRegionServer server,
TableName table) throws IOException {
List<HRegion> regions = server.getRegions(table);
for (HRegion region : regions) {
region.flush(true);
region.compact(true);
}
return regions.size();
}
/**
* Check the existence of FlowRunCoprocessor in a table.
* @param server region server
* @param table table to check
* @param existenceExpected true if the FlowRunCoprocessor is expected
* to be loaded in the table, false otherwise
* @throws Exception
*/
public static void validateFlowRunCoprocessor(HRegionServer server,
TableName table, boolean existenceExpected) throws Exception {
List<HRegion> regions = server.getRegions(table);
for (HRegion region : regions) {
boolean found = false;
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
if (coprocName.contains("FlowRunCoprocessor")) {
found = true;
}
}
if (found != existenceExpected) {
throw new Exception("FlowRunCoprocessor is" +
(existenceExpected ? " not " : " ") + "loaded in table " + table);
}
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
* a set of utility classes used across backend storage reader and writer.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,285 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Coprocessor for flow run table.
*/
public class FlowRunCoprocessor implements RegionCoprocessor, RegionObserver {
private static final Logger LOG =
LoggerFactory.getLogger(FlowRunCoprocessor.class);
private Region region;
/**
* generate a timestamp that is unique per row in a region this is per region.
*/
private final TimestampGenerator timestampGenerator =
new TimestampGenerator();
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion();
}
}
/*
* (non-Javadoc)
*
* This method adds the tags onto the cells in the Put. It is presumed that
* all the cells in one Put have the same set of Tags. The existing cell
* timestamp is overwritten for non-metric cells and each such cell gets a new
* unique timestamp generated by {@link TimestampGenerator}
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
* .hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Put,
* org.apache.hadoop.hbase.regionserver.wal.WALEdit,
* org.apache.hadoop.hbase.client.Durability)
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
WALEdit edit, Durability durability) throws IOException {
Map<String, byte[]> attributes = put.getAttributesMap();
// Assumption is that all the cells in a put are the same operation.
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute);
if (t != null) {
tags.add(t);
}
}
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
.entrySet()) {
List<Cell> newCells = new ArrayList<>(entry.getValue().size());
for (Cell cell : entry.getValue()) {
// for each cell in the put add the tags
// Assumption is that all the cells in
// one put are the same operation
// also, get a unique cell timestamp for non-metric cells
// this way we don't inadvertently overwrite cell versions
long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
tagByteArray));
}
newFamilyMap.put(entry.getKey(), newCells);
} // for each entry
// Update the family map for the Put
put.setFamilyCellMap(newFamilyMap);
}
}
/**
* Determines if the current cell's timestamp is to be used or a new unique
* cell timestamp is to be used. The reason this is done is to inadvertently
* overwrite cells when writes come in very fast. But for metric cells, the
* cell timestamp signifies the metric timestamp. Hence we don't want to
* overwrite it.
*
* @param timestamp
* @param tags
* @return cell timestamp
*/
private long getCellTimestamp(long timestamp, List<Tag> tags) {
// if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
// then use the generator
if (timestamp == HConstants.LATEST_TIMESTAMP) {
return timestampGenerator.getUniqueTimestamp();
} else {
return timestamp;
}
}
/*
* (non-Javadoc)
*
* Creates a {@link FlowScanner} Scan so that it can correctly process the
* contents of {@link FlowRunTable}.
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
* .hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Get, java.util.List)
*/
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
Get get, List<Cell> results) throws IOException {
Scan scan = new Scan(get);
scan.setMaxVersions();
RegionScanner scanner = null;
try {
scanner = new FlowScanner(e.getEnvironment(), scan,
region.getScanner(scan), FlowScannerOperation.READ);
scanner.next(results);
e.bypass();
} finally {
if (scanner != null) {
scanner.close();
}
}
}
/*
* (non-Javadoc)
*
* Ensures that max versions are set for the Scan so that metrics can be
* correctly aggregated and min/max can be correctly determined.
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
* .apache.hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Scan)
*/
@Override
public void preScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
throws IOException {
// set max versions for scan to see all
// versions to aggregate for metrics
scan.setMaxVersions();
}
/*
* (non-Javadoc)
*
* Creates a {@link FlowScanner} Scan so that it can correctly process the
* contents of {@link FlowRunTable}.
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
* org.apache.hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Scan,
* org.apache.hadoop.hbase.regionserver.RegionScanner)
*/
@Override
public RegionScanner postScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner scanner) throws IOException {
return new FlowScanner(e.getEnvironment(), scan,
scanner, FlowScannerOperation.READ);
}
@Override
public InternalScanner preFlush(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker cycleTracker)
throws IOException {
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("preFlush store = " + store.getColumnFamilyName()
+ " flushableSize=" + store.getFlushableSize()
+ " flushedCellsCount=" + store.getFlushedCellsCount()
+ " compactedCellsCount=" + store.getCompactedCellsCount()
+ " majorCompactedCellsCount="
+ store.getMajorCompactedCellsCount() + " memstoreSize="
+ store.getMemStoreSize() + " size=" + store.getSize()
+ " storeFilesCount=" + store.getStorefilesCount());
}
}
return new FlowScanner(c.getEnvironment(), scanner,
FlowScannerOperation.FLUSH);
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) {
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("postFlush store = " + store.getColumnFamilyName()
+ " flushableSize=" + store.getFlushableSize()
+ " flushedCellsCount=" + store.getFlushedCellsCount()
+ " compactedCellsCount=" + store.getCompactedCellsCount()
+ " majorCompactedCellsCount="
+ store.getMajorCompactedCellsCount() + " memstoreSize="
+ store.getMemStoreSize() + " size=" + store.getSize()
+ " storeFilesCount=" + store.getStorefilesCount());
}
}
}
@Override
public InternalScanner preCompact(
ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType,
CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
if (request != null) {
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION);
LOG.info("Compactionrequest= " + request.toString() + " "
+ requestOp.toString() + " RegionName=" + e.getEnvironment()
.getRegion().getRegionInfo().getRegionNameAsString());
}
return new FlowScanner(e.getEnvironment(), scanner, requestOp);
}
}

View File

@ -0,0 +1,723 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
* table. Looks through the list of cells per row, checks their tags and does
* operation on those cells as per the cell tags. Transforms reads of the stored
* metrics into calculated sums for each column Also, finds the min and max for
* start and end times in a flow run.
*/
class FlowScanner implements RegionScanner, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(FlowScanner.class);
/**
* use a special application id to represent the flow id this is needed since
* TimestampGenerator parses the app id to generate a cell timestamp.
*/
private static final String FLOW_APP_ID = "application_00000000000_0000";
private final Region region;
private final InternalScanner flowRunScanner;
private final int batchSize;
private final long appFinalValueRetentionThreshold;
private RegionScanner regionScanner;
private boolean hasMore;
private byte[] currentRow;
private List<Cell> availableCells = new ArrayList<>();
private int currentIndex;
private FlowScannerOperation action = FlowScannerOperation.READ;
FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
FlowScannerOperation action) {
this(env, null, internalScanner, action);
}
FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
InternalScanner internalScanner, FlowScannerOperation action) {
this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
// TODO initialize other scan attributes like Scan#maxResultSize
this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner;
}
this.action = action;
if (env == null) {
this.appFinalValueRetentionThreshold =
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
this.region = null;
} else {
this.region = env.getRegion();
Configuration hbaseConf = env.getConfiguration();
this.appFinalValueRetentionThreshold = hbaseConf.getLong(
YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
}
if (LOG.isDebugEnabled()) {
LOG.debug(" batch size=" + batchSize);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
*/
@Override
public HRegionInfo getRegionInfo() {
return new HRegionInfo(region.getRegionInfo());
}
@Override
public boolean nextRaw(List<Cell> cells) throws IOException {
return nextRaw(cells, ScannerContext.newBuilder().build());
}
@Override
public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
throws IOException {
return nextInternal(cells, scannerContext);
}
@Override
public boolean next(List<Cell> cells) throws IOException {
return next(cells, ScannerContext.newBuilder().build());
}
@Override
public boolean next(List<Cell> cells, ScannerContext scannerContext)
throws IOException {
return nextInternal(cells, scannerContext);
}
/**
* Get value converter associated with a column or a column prefix. If nothing
* matches, generic converter is returned.
* @param colQualifierBytes
* @return value converter implementation.
*/
private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
// Iterate over all the column prefixes for flow run table and get the
// appropriate converter for the column qualifier passed if prefix matches.
for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
colQualifierBytes, 0, colPrefixBytes.length) == 0) {
return colPrefix.getValueConverter();
}
}
// Iterate over all the columns for flow run table and get the
// appropriate converter for the column qualifier passed if match occurs.
for (FlowRunColumn column : FlowRunColumn.values()) {
if (Bytes.compareTo(
column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
return column.getValueConverter();
}
}
// Return generic converter if nothing matches.
return GenericConverter.getInstance();
}
/**
* This method loops through the cells in a given row of the
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how
* to process the contents. It then calculates the sum or min or max for each
* column or returns the cell as is.
*
* @param cells
* @param scannerContext
* @return true if next row is available for the scanner, false otherwise
* @throws IOException
*/
private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
throws IOException {
Cell cell = null;
startNext();
// Loop through all the cells in this row
// For min/max/metrics we do need to scan the entire set of cells to get the
// right one
// But with flush/compaction, the number of cells being scanned will go down
// cells are grouped per column qualifier then sorted by cell timestamp
// (latest to oldest) per column qualifier
// So all cells in one qualifier come one after the other before we see the
// next column qualifier
ByteArrayComparator comp = new ByteArrayComparator();
byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
AggregationOperation currentAggOp = null;
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0;
long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null;
int limit = batchSize;
while (limit <= 0 || addedCnt < limit) {
cell = peekAtNextCell(scannerContext);
if (cell == null) {
break;
}
byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
if (previousColumnQualifier == null) {
// first time in loop
previousColumnQualifier = currentColumnQualifier;
}
converter = getValueConverter(currentColumnQualifier);
if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
converter, currentTimestamp);
resetState(currentColumnCells, alreadySeenAggDim);
previousColumnQualifier = currentColumnQualifier;
currentAggOp = getCurrentAggOp(cell);
converter = getValueConverter(currentColumnQualifier);
}
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
converter, scannerContext);
nextCell(scannerContext);
}
if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
currentTimestamp);
if (LOG.isDebugEnabled()) {
if (addedCnt > 0) {
LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+ " rowKey="
+ FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
} else {
LOG.debug("emitted no cells for " + this.action);
}
}
}
return hasMore();
}
private AggregationOperation getCurrentAggOp(Cell cell) {
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
// We assume that all the operations for a particular column are the same
return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
}
/**
* resets the parameters to an initialized state for next loop iteration.
*/
private void resetState(SortedSet<Cell> currentColumnCells,
Set<String> alreadySeenAggDim) {
currentColumnCells.clear();
alreadySeenAggDim.clear();
}
private void collectCells(SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, Cell cell,
Set<String> alreadySeenAggDim, ValueConverter converter,
ScannerContext scannerContext) throws IOException {
if (currentAggOp == null) {
// not a min/max/metric cell, so just return it as is
currentColumnCells.add(cell);
return;
}
switch (currentAggOp) {
case GLOBAL_MIN:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
Cell currentMinCell = currentColumnCells.first();
Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
(NumericValueConverter) converter);
if (!currentMinCell.equals(newMinCell)) {
currentColumnCells.remove(currentMinCell);
currentColumnCells.add(newMinCell);
}
}
break;
case GLOBAL_MAX:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
Cell currentMaxCell = currentColumnCells.first();
Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
(NumericValueConverter) converter);
if (!currentMaxCell.equals(newMaxCell)) {
currentColumnCells.remove(currentMaxCell);
currentColumnCells.add(newMaxCell);
}
}
break;
case SUM:
case SUM_FINAL:
if (LOG.isTraceEnabled()) {
LOG.trace("In collect cells "
+ " FlowSannerOperation="
+ this.action
+ " currentAggOp="
+ currentAggOp
+ " cell qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(cell))
+ " cell value= "
+ converter.decodeValue(CellUtil.cloneValue(cell))
+ " timestamp=" + cell.getTimestamp());
}
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
String aggDim = HBaseTimelineServerUtils
.getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
// if this agg dimension has already been seen,
// since they show up in sorted order
// we drop the rest which are older
// in other words, this cell is older than previously seen cells
// for that agg dim
// but when this agg dim is not seen,
// consider this cell in our working set
currentColumnCells.add(cell);
alreadySeenAggDim.add(aggDim);
}
break;
default:
break;
} // end of switch case
}
/*
* Processes the cells in input param currentColumnCells and populates
* List<Cell> cells as the output based on the input AggregationOperation
* parameter.
*/
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, ValueConverter converter,
long currentTimestamp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0;
}
if (currentAggOp == null) {
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
if (LOG.isTraceEnabled()) {
LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+ currentColumnCells.size() + " currentAggOp" + currentAggOp);
}
switch (currentAggOp) {
case GLOBAL_MIN:
case GLOBAL_MAX:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case SUM:
case SUM_FINAL:
switch (action) {
case FLUSH:
case MINOR_COMPACTION:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case READ:
Cell sumCell = processSummation(currentColumnCells,
(NumericValueConverter) converter);
cells.add(sumCell);
return 1;
case MAJOR_COMPACTION:
List<Cell> finalCells = processSummationMajorCompaction(
currentColumnCells, (NumericValueConverter) converter,
currentTimestamp);
cells.addAll(finalCells);
return finalCells.size();
default:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
default:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
}
/*
* Returns a cell whose value is the sum of all cell values in the input set.
* The new cell created has the timestamp of the most recent metric cell. The
* sum of a metric for a flow run is the summation at the point of the last
* metric update in that flow till that time.
*/
private Cell processSummation(SortedSet<Cell> currentColumnCells,
NumericValueConverter converter) throws IOException {
Number sum = 0;
Number currentValue = 0;
long ts = 0L;
long mostCurrentTimestamp = 0L;
Cell mostRecentCell = null;
for (Cell cell : currentColumnCells) {
currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
ts = cell.getTimestamp();
if (mostCurrentTimestamp < ts) {
mostCurrentTimestamp = ts;
mostRecentCell = cell;
}
sum = converter.add(sum, currentValue);
}
byte[] sumBytes = converter.encodeValue(sum);
Cell sumCell =
HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell;
}
/**
* Returns a list of cells that contains
*
* A) the latest cells for applications that haven't finished yet
* B) summation
* for the flow, based on applications that have completed and are older than
* a certain time
*
* The new cell created has the timestamp of the most recent metric cell. The
* sum of a metric for a flow run is the summation at the point of the last
* metric update in that flow till that time.
*/
@VisibleForTesting
List<Cell> processSummationMajorCompaction(
SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
long currentTimestamp)
throws IOException {
Number sum = 0;
Number currentValue = 0;
long ts = 0L;
boolean summationDone = false;
List<Cell> finalCells = new ArrayList<Cell>();
if (currentColumnCells == null) {
return finalCells;
}
if (LOG.isDebugEnabled()) {
LOG.debug("In processSummationMajorCompaction,"
+ " will drop cells older than " + currentTimestamp
+ " CurrentColumnCells size=" + currentColumnCells.size());
}
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
String appId = HBaseTimelineServerUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
sum = converter.add(sum, currentValue);
summationDone = true;
if (LOG.isTraceEnabled()) {
LOG.trace("reading flow app id sum=" + sum);
}
} else {
currentValue = (Number) converter.decodeValue(CellUtil
.cloneValue(cell));
// read the timestamp truncated by the generator
ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
if ((cellAggOp == AggregationOperation.SUM_FINAL)
&& ((ts + this.appFinalValueRetentionThreshold)
< currentTimestamp)) {
sum = converter.add(sum, currentValue);
summationDone = true;
if (LOG.isTraceEnabled()) {
LOG.trace("MAJOR COMPACTION loop sum= " + sum
+ " discarding now: " + " qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+ converter.decodeValue(CellUtil.cloneValue(cell))
+ " timestamp=" + cell.getTimestamp() + " " + this.action);
}
} else {
// not a final value but it's the latest cell for this app
// so include this cell in the list of cells to write back
finalCells.add(cell);
}
}
}
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List<Tag> tags = new ArrayList<Tag>();
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
t = HBaseTimelineServerUtils.createTag(
AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
Cell sumCell = HBaseTimelineServerUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
CellUtil.cloneQualifier(anyCell),
TimestampGenerator.getSupplementedTimestamp(
System.currentTimeMillis(), FLOW_APP_ID),
converter.encodeValue(sum), tagByteArray);
finalCells.add(sumCell);
if (LOG.isTraceEnabled()) {
LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+ Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ " " + this.action);
}
LOG.info("After major compaction for qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ " with currentColumnCells.size="
+ currentColumnCells.size()
+ " returning finalCells.size=" + finalCells.size()
+ " with sum=" + sum.longValue()
+ " with cell timestamp " + sumCell.getTimestamp());
} else {
String qualifier = "";
LOG.info("After major compaction for qualifier=" + qualifier
+ " with currentColumnCells.size="
+ currentColumnCells.size()
+ " returning finalCells.size=" + finalCells.size()
+ " with zero sum="
+ sum.longValue());
}
return finalCells;
}
/**
* Determines which cell is to be returned based on the values in each cell
* and the comparison operation MIN or MAX.
*
* @param previouslyChosenCell
* @param currentCell
* @param currentAggOp
* @return the cell which is the min (or max) cell
* @throws IOException
*/
private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
AggregationOperation currentAggOp, NumericValueConverter converter)
throws IOException {
if (previouslyChosenCell == null) {
return currentCell;
}
try {
Number previouslyChosenCellValue = (Number)converter.decodeValue(
CellUtil.cloneValue(previouslyChosenCell));
Number currentCellValue = (Number) converter.decodeValue(CellUtil
.cloneValue(currentCell));
switch (currentAggOp) {
case GLOBAL_MIN:
if (converter.compare(
currentCellValue, previouslyChosenCellValue) < 0) {
// new value is minimum, hence return this cell
return currentCell;
} else {
// previously chosen value is miniumum, hence return previous min cell
return previouslyChosenCell;
}
case GLOBAL_MAX:
if (converter.compare(
currentCellValue, previouslyChosenCellValue) > 0) {
// new value is max, hence return this cell
return currentCell;
} else {
// previously chosen value is max, hence return previous max cell
return previouslyChosenCell;
}
default:
return currentCell;
}
} catch (IllegalArgumentException iae) {
LOG.error("caught iae during conversion to long ", iae);
return currentCell;
}
}
@Override
public void close() throws IOException {
if (flowRunScanner != null) {
flowRunScanner.close();
} else {
LOG.warn("scanner close called but scanner is null");
}
}
/**
* Called to signal the start of the next() call by the scanner.
*/
public void startNext() {
currentRow = null;
}
/**
* Returns whether or not the underlying scanner has more rows.
*/
public boolean hasMore() {
return currentIndex < availableCells.size() ? true : hasMore;
}
/**
* Returns the next available cell for the current row and advances the
* pointer to the next cell. This method can be called multiple times in a row
* to advance through all the available cells.
*
* @param scannerContext
* context information for the batch of cells under consideration
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException
*/
public Cell nextCell(ScannerContext scannerContext) throws IOException {
Cell cell = peekAtNextCell(scannerContext);
if (cell != null) {
currentIndex++;
}
return cell;
}
/**
* Returns the next available cell for the current row, without advancing the
* pointer. Calling this method multiple times in a row will continue to
* return the same cell.
*
* @param scannerContext
* context information for the batch of cells under consideration
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException if any problem is encountered while grabbing the next
* cell.
*/
public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
if (currentIndex >= availableCells.size()) {
// done with current batch
availableCells.clear();
currentIndex = 0;
hasMore = flowRunScanner.next(availableCells, scannerContext);
}
Cell cell = null;
if (currentIndex < availableCells.size()) {
cell = availableCells.get(currentIndex);
if (currentRow == null) {
currentRow = CellUtil.cloneRow(cell);
} else if (!CellUtil.matchingRow(cell, currentRow)) {
// moved on to the next row
// don't use the current cell
// also signal no more cells for this row
return null;
}
}
return cell;
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
*/
@Override
public long getMaxResultSize() {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.isFilterDone() called when the flow "
+ "scanner's scanner is not a RegionScanner");
}
return regionScanner.getMaxResultSize();
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
*/
@Override
public long getMvccReadPoint() {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.isFilterDone() called when the flow "
+ "scanner's internal scanner is not a RegionScanner");
}
return regionScanner.getMvccReadPoint();
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
*/
@Override
public boolean isFilterDone() throws IOException {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.isFilterDone() called when the flow "
+ "scanner's internal scanner is not a RegionScanner");
}
return regionScanner.isFilterDone();
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
*/
@Override
public boolean reseek(byte[] bytes) throws IOException {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.reseek() called when the flow "
+ "scanner's internal scanner is not a RegionScanner");
}
return regionScanner.reseek(bytes);
}
@Override
public int getBatch() {
return batchSize;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
/**
* Identifies the scanner operation on the {@link FlowRunTable}.
*/
public enum FlowScannerOperation {
/**
* If the scanner is opened for reading
* during preGet or preScan.
*/
READ,
/**
* If the scanner is opened during preFlush.
*/
FLUSH,
/**
* If the scanner is opened during minor Compaction.
*/
MINOR_COMPACTION,
/**
* If the scanner is opened during major Compaction.
*/
MAJOR_COMPACTION
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
* contains classes related to implementation for flow related tables, viz. flow
* run table and flow activity table.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage contains
* classes which define and implement reading and writing to backend storage.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -24,138 +24,62 @@
<groupId>org.apache.hadoop</groupId>
<version>3.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<name>Apache Hadoop YARN TimelineService HBase Server</name>
<version>3.2.0-SNAPSHOT</version>
<name>Apache Hadoop YARN TimelineService HBase Servers</name>
<packaging>pom</packaging>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptor>src/assembly/coprocessor.xml</descriptor>
<attach>true</attach>
</configuration>
<executions>
<execution>
<id>create-coprocessor-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hbase1</id>
<activation>
<property>
<name>!hbase.profile</name>
</property>
</activation>
<modules>
<module>hadoop-yarn-server-timelineservice-hbase-server-1</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>hadoop-yarn-server-timelineservice-hbase-server-2/**/*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hbase2</id>
<activation>
<property>
<name>hbase.profile</name>
<value>2.0</value>
</property>
</activation>
<modules>
<module>hadoop-yarn-server-timelineservice-hbase-server-2</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>hadoop-yarn-server-timelineservice-hbase-server-1/**/*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>