diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index 2c266b617f..382c967263 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -236,7 +236,7 @@ - org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server + org.apache.hadoop:${hbase-server-artifactid} share/hadoop/${hadoop.component}/timelineservice diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 7cc68bb691..d23a548f5a 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -49,9 +49,6 @@ 2.11.0 0.8.2.1 - 1.2.6 - 2.5.1 - 11.0.2 ${project.version} 1.0.13 @@ -407,12 +404,6 @@ ${project.version} - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-server - ${project.version} - - org.apache.hadoop hadoop-yarn-applications-distributedshell @@ -666,7 +657,6 @@ jsp-api 2.1 - org.glassfish javax.servlet @@ -1839,6 +1829,64 @@ + + + hbase1 + + + !hbase.profile + + + + 1.2.6 + 2.5.1 + 11.0.2 + hadoop-yarn-server-timelineservice-hbase-server-1 + + + + + org.apache.hadoop + ${hbase-server-artifactid} + ${project.version} + + + + + + + hbase2 + + + hbase.profile + 2.0 + + + + 2.0.0-beta-1 + 3.0.0 + 11.0.2 + hadoop-yarn-server-timelineservice-hbase-server-2 + + + + + org.apache.hadoop + ${hbase-server-artifactid} + ${project.version} + + + org.jruby.jcodings + jcodings + 1.0.13 + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index d9f992d056..2c8d5dd241 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -27,7 +27,7 @@ 4.0.0 hadoop-yarn-server-timelineservice-hbase-tests 3.2.0-SNAPSHOT - Apache Hadoop YARN Timeline Service HBase tests + Apache Hadoop YARN TimelineService HBase tests @@ -82,18 +82,6 @@ - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-server - test - - - org.apache.hadoop - hadoop-common - - - - org.apache.hadoop hadoop-common @@ -414,4 +402,68 @@ + + + + hbase1 + + + !hbase.profile + + + + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-server-1 + test + + + org.apache.hadoop + hadoop-common + + + + + + + + hbase2 + + + hbase.profile + 2.0 + + + + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-server-2 + test + + + org.apache.hadoop + hadoop-common + + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hbase-compatible-hadoop.version} + test + + + + + org.mockito + mockito-all + test + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 622b0eba22..c7d0d4e3e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.EnumSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -64,6 +62,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -89,7 +88,7 @@ public static void setupBeforeClass() throws Exception { } @Test - public void checkCoProcessorOff() throws IOException, InterruptedException { + public void checkCoProcessorOff() throws Exception, InterruptedException { Configuration hbaseConf = util.getConfiguration(); TableName table = BaseTableRW.getTableName(hbaseConf, FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME); @@ -127,19 +126,9 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { } private void checkCoprocessorExists(TableName table, boolean exists) - throws IOException, InterruptedException { + throws Exception { HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (Region region : regions) { - boolean found = false; - Set coprocs = region.getCoprocessorHost().getCoprocessors(); - for (String coprocName : coprocs) { - if (coprocName.contains("FlowRunCoprocessor")) { - found = true; - } - } - assertEquals(found, exists); - } + HBaseTimelineServerUtils.validateFlowRunCoprocessor(server, table, exists); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 31be285e41..2ff37afc98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -44,9 +45,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -327,20 +326,15 @@ public void testWriteFlowRunCompaction() throws Exception { } // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable( - BaseTableRW.getTableName(c1, FlowRunTableRW.TABLE_NAME_CONF_NAME, - FlowRunTableRW.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions( - BaseTableRW.getTableName(c1, - FlowRunTableRW.TABLE_NAME_CONF_NAME, - FlowRunTableRW.DEFAULT_TABLE_NAME)); - assertTrue("Didn't find any regions for primary table!", - regions.size() > 0); + TableName flowRunTable = BaseTableRW.getTableName(c1, + FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME); + HRegionServer server = util.getRSForFirstRegionInTable(flowRunTable); + // flush and compact all the regions of the primary table - for (Region region : regions) { - region.flush(true); - region.compact(true); - } + int regionNum = HBaseTimelineServerUtils.flushCompactTableRegions( + server, flowRunTable); + assertTrue("Didn't find any regions for primary table!", + regionNum > 0); // check flow run for one flow many apps checkFlowRunTable(cluster, user, flow, runid, c1, 4); @@ -392,13 +386,10 @@ private void checkFlowRunTable(String cluster, String user, String flow, private FlowScanner getFlowScannerForTestingCompaction() { // create a FlowScanner object with the sole purpose of invoking a process // summation; - CompactionRequest request = new CompactionRequest(); - request.setIsMajor(true, true); // okay to pass in nulls for the constructor arguments // because all we want to do is invoke the process summation FlowScanner fs = new FlowScanner(null, null, - (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION)); + FlowScannerOperation.MAJOR_COMPACTION); assertNotNull(fs); return fs; } @@ -423,40 +414,45 @@ public void checkProcessSummationMoreCellsSumFinal2() SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_91188"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL Cell c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_12700000001_29102"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a recent timestamp and attribute SUM_FINAL Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_191780000000001_8195"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c3 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); currentColumnCells.add(c3); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_191780000000001_98104"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c4 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); @@ -523,10 +519,12 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // insert SUM_FINAL cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -537,10 +535,12 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // add SUM cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with attribute SUM c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -614,10 +614,12 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // insert SUM_FINAL cells which will expire for (int i = 0; i < countFinal; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -628,10 +630,12 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // insert SUM_FINAL cells which will NOT expire for (int i = 0; i < countFinalNotExpire; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -642,10 +646,12 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // add SUM cells for (int i = 0; i < countNotFinal; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with attribute SUM c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -697,10 +703,12 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { long cellValue2 = 28L; List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp and attribute SUM_FINAL @@ -709,10 +717,11 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { currentColumnCells.add(c1); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_100000000001_119101"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, @@ -755,10 +764,12 @@ public void testProcessSummationOneCellSumFinal() throws IOException { // note down the current timestamp long currentTimestamp = System.currentTimeMillis(); List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp @@ -793,10 +804,12 @@ public void testProcessSummationOneCell() throws IOException { // try for 1 cell with tag SUM List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml index a1db497111..3602f02003 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml @@ -40,8 +40,8 @@ - commons-logging - commons-logging + org.slf4j + slf4j-api @@ -93,7 +93,7 @@ org.apache.hadoop - hadoop-yarn-server-applicationhistoryservice + hadoop-yarn-server-common provided diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml new file mode 100644 index 0000000000..df7c5e3f06 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml @@ -0,0 +1,186 @@ + + + + + hadoop-yarn-server-timelineservice-hbase-server + org.apache.hadoop + 3.2.0-SNAPSHOT + + + 4.0.0 + hadoop-yarn-server-timelineservice-hbase-server-1 + Apache Hadoop YARN TimelineService HBase Server 1.2 + 3.2.0-SNAPSHOT + + + + ${project.parent.parent.parent.parent.basedir} + + + + + default + + true + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + default-compile + none + + + + + + + + + hbase1 + + + !hbase.profile + + + + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-common + + + + org.slf4j + slf4j-api + + + + com.google.guava + guava + + + + org.apache.hadoop + hadoop-annotations + provided + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-yarn-api + provided + + + + org.apache.hbase + hbase-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.mortbay.jetty + jetty-util + + + + + + org.apache.hbase + hbase-client + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + + + + org.apache.hbase + hbase-server + provided + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-hdfs-client + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.mortbay.jetty + jetty-sslengine + + + + + + + + + maven-assembly-plugin + + src/assembly/coprocessor.xml + true + + + + create-coprocessor-jar + prepare-package + + single + + + + + + + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml index 01ff0dd063..dd53bf22c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml @@ -16,7 +16,8 @@ --> + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 + http://maven.apache.org/xsd/assembly-1.1.3.xsd"> coprocessor jar @@ -30,7 +31,7 @@ runtime org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common - org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server + org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java similarity index 60% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java index 5c07670a62..3a9e2596c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java @@ -20,7 +20,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; @@ -28,6 +31,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; /** * A utility class used by hbase-server module. @@ -49,7 +53,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) { AggregationOperation aggOp = AggregationOperation .getAggregationOperation(attribute.getKey()); if (aggOp != null) { - Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + Tag t = createTag(aggOp.getTagType(), attribute.getValue()); return t; } @@ -57,7 +61,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) { AggregationCompactionDimension.getAggregationCompactionDimension( attribute.getKey()); if (aggCompactDim != null) { - Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue()); return t; } return null; @@ -96,6 +100,45 @@ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, newValue, tags); } + /** + * Create a Tag. + * @param tagType tag type + * @param tag the content of the tag in byte array. + * @return an instance of Tag + */ + public static Tag createTag(byte tagType, byte[] tag) { + return new Tag(tagType, tag); + } + + /** + * Create a Tag. + * @param tagType tag type + * @param tag the content of the tag in String. + * @return an instance of Tag + */ + public static Tag createTag(byte tagType, String tag) { + return createTag(tagType, Bytes.toBytes(tag)); + } + + /** + * Convert a cell to a list of tags. + * @param cell the cell to convert + * @return a list of tags + */ + public static List convertCellAsTagList(Cell cell) { + return Tag.asList( + cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + } + + /** + * Convert a list of tags to a byte array. + * @param tags the list of tags to convert + * @return byte array representation of the list of tags + */ + public static byte[] convertTagListToByteArray(List tags) { + return Tag.fromList(tags); + } + /** * returns app id from the list of tags. * @@ -132,4 +175,48 @@ public static AggregationOperation getAggregationOperationFromTagsList( } return null; } + + // flush and compact all the regions of the primary table + + /** + * Flush and compact all regions of a table. + * @param server region server + * @param table the table to flush and compact + * @return the number of regions flushed and compacted + */ + public static int flushCompactTableRegions(HRegionServer server, + TableName table) throws IOException { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { + region.flush(true); + region.compact(true); + } + return regions.size(); + } + + /** + * Check the existence of FlowRunCoprocessor in a table. + * @param server region server + * @param table table to check + * @param existenceExpected true if the FlowRunCoprocessor is expected + * to be loaded in the table, false otherwise + * @throws Exception + */ + public static void validateFlowRunCoprocessor(HRegionServer server, + TableName table, boolean existenceExpected) throws Exception { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { + boolean found = false; + Set coprocs = region.getCoprocessorHost().getCoprocessors(); + for (String coprocName : coprocs) { + if (coprocName.contains("FlowRunCoprocessor")) { + found = true; + } + } + if (found != existenceExpected) { + throw new Exception("FlowRunCoprocessor is" + + (existenceExpected ? " not " : " ") + "loaded in table " + table); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 41a371b522..c526f58517 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -102,7 +102,8 @@ public void prePut(ObserverContext e, Put put, tags.add(t); } } - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); NavigableMap> newFamilyMap = new TreeMap<>( Bytes.BYTES_COMPARATOR); for (Map.Entry> entry : put.getFamilyCellMap() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 7f09e518ce..31122cae65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -247,8 +247,7 @@ private boolean nextInternal(List cells, ScannerContext scannerContext) } private AggregationOperation getCurrentAggOp(Cell cell) { - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); + List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); // We assume that all the operations for a particular column are the same return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags); } @@ -316,8 +315,7 @@ private void collectCells(SortedSet currentColumnCells, } // only if this app has not been seen yet, add to current column cells - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); + List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); String aggDim = HBaseTimelineServerUtils .getAggregationCompactionDimension(tags); if (!alreadySeenAggDim.contains(aggDim)) { @@ -454,8 +452,7 @@ List processSummationMajorCompaction( for (Cell cell : currentColumnCells) { AggregationOperation cellAggOp = getCurrentAggOp(cell); // if this is the existing flow sum cell - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); + List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); String appId = HBaseTimelineServerUtils .getAggregationCompactionDimension(tags); if (appId == FLOW_APP_ID) { @@ -491,13 +488,16 @@ List processSummationMajorCompaction( if (summationDone) { Cell anyCell = currentColumnCells.first(); List tags = new ArrayList(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), Bytes.toBytes(FLOW_APP_ID)); tags.add(t); - t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationCompactionDimension.APPLICATION_ID.getTagType(), Bytes.toBytes(FLOW_APP_ID)); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); Cell sumCell = HBaseTimelineServerUtils.createNewCell( CellUtil.cloneRow(anyCell), CellUtil.cloneFamily(anyCell), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml new file mode 100644 index 0000000000..0b2cb2fed6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml @@ -0,0 +1,194 @@ + + + + + hadoop-yarn-server-timelineservice-hbase-server + org.apache.hadoop + 3.2.0-SNAPSHOT + + 4.0.0 + + hadoop-yarn-server-timelineservice-hbase-server-2 + Apache Hadoop YARN TimelineService HBase Server 2.0 + 3.2.0-SNAPSHOT + + + + ${project.parent.parent.parent.parent.basedir} + + + + + default + + true + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + default-compile + none + + + + + + + + + hbase2 + + + hbase.profile + 2.0 + + + + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-common + + + + org.slf4j + slf4j-api + + + + com.google.guava + guava + + + + org.apache.hadoop + hadoop-annotations + provided + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-yarn-api + provided + + + + org.apache.hbase + hbase-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.mortbay.jetty + jetty-util + + + + + + org.apache.hbase + hbase-client + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + + + + + org.jruby.jcodings + jcodings + + + + org.apache.hbase + hbase-server + provided + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-hdfs-client + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.mortbay.jetty + jetty-sslengine + + + + + + + + + maven-assembly-plugin + + + create-coprocessor-jar + prepare-package + + single + + + src/assembly/coprocessor.xml + true + + + + + + + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/assembly/coprocessor.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/assembly/coprocessor.xml new file mode 100644 index 0000000000..faadc1d647 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/assembly/coprocessor.xml @@ -0,0 +1,38 @@ + + + coprocessor + + jar + + false + + + / + true + true + runtime + + org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common + org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-2 + + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java new file mode 100644 index 0000000000..cf2d5e0335 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagUtil; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A utility class used by hbase-server module. + */ +public final class HBaseTimelineServerUtils { + private HBaseTimelineServerUtils() { + } + + /** + * Creates a {@link Tag} from the input attribute. + * + * @param attribute Attribute from which tag has to be fetched. + * @return a HBase Tag. + */ + public static Tag getTagFromAttribute(Map.Entry attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = createTag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = + AggregationCompactionDimension.getAggregationCompactionDimension( + attribute.getKey()); + if (aggCompactDim != null) { + Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + + /** + * creates a new cell based on the input cell but with the new value. + * + * @param origCell Original cell + * @param newValue new cell value + * @return cell + * @throws IOException while creating new cell. + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs. + * + * @param row row of the cell to be created + * @param family column family name of the new cell + * @param qualifier qualifier for the new cell + * @param ts timestamp of the new cell + * @param newValue value of the new cell + * @param tags tags in the new cell + * @return cell + * @throws IOException while creating the cell. + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * Create a Tag. + * @param tagType tag type + * @param tag the content of the tag in byte array. + * @return an instance of Tag + */ + public static Tag createTag(byte tagType, byte[] tag) { + return new ArrayBackedTag(tagType, tag); + } + + /** + * Create a Tag. + * @param tagType tag type + * @param tag the content of the tag in String. + * @return an instance of Tag + */ + public static Tag createTag(byte tagType, String tag) { + return createTag(tagType, Bytes.toBytes(tag)); + } + + /** + * Convert a cell to a list of tags. + * @param cell the cell to convert + * @return a list of tags + */ + public static List convertCellAsTagList(Cell cell) { + return TagUtil.asList( + cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + } + + /** + * Convert a list of tags to a byte array. + * @param tags the list of tags to convert + * @return byte array representation of the list of tags + */ + public static byte[] convertTagListToByteArray(List tags) { + return TagUtil.fromList(tags); + } + + /** + * returns app id from the list of tags. + * + * @param tags cell tags to be looked into + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(Tag.cloneValue(t)); + return appId; + } + } + return appId; + } + + /** + * Returns the first seen aggregation operation as seen in the list of input + * tags or null otherwise. + * + * @param tags list of HBase tags. + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + // flush and compact all the regions of the primary table + + /** + * Flush and compact all regions of a table. + * @param server region server + * @param table the table to flush and compact + * @return the number of regions flushed and compacted + */ + public static int flushCompactTableRegions(HRegionServer server, + TableName table) throws IOException { + List regions = server.getRegions(table); + for (HRegion region : regions) { + region.flush(true); + region.compact(true); + } + return regions.size(); + } + + /** + * Check the existence of FlowRunCoprocessor in a table. + * @param server region server + * @param table table to check + * @param existenceExpected true if the FlowRunCoprocessor is expected + * to be loaded in the table, false otherwise + * @throws Exception + */ + public static void validateFlowRunCoprocessor(HRegionServer server, + TableName table, boolean existenceExpected) throws Exception { + List regions = server.getRegions(table); + for (HRegion region : regions) { + boolean found = false; + Set coprocs = region.getCoprocessorHost().getCoprocessors(); + for (String coprocName : coprocs) { + if (coprocName.contains("FlowRunCoprocessor")) { + found = true; + } + } + if (found != existenceExpected) { + throw new Exception("FlowRunCoprocessor is" + + (existenceExpected ? " not " : " ") + "loaded in table " + table); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java new file mode 100644 index 0000000000..0df5b8af84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains + * a set of utility classes used across backend storage reader and writer. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java new file mode 100644 index 0000000000..16caca4f5f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Coprocessor for flow run table. + */ +public class FlowRunCoprocessor implements RegionCoprocessor, RegionObserver { + + private static final Logger LOG = + LoggerFactory.getLogger(FlowRunCoprocessor.class); + + private Region region; + /** + * generate a timestamp that is unique per row in a region this is per region. + */ + private final TimestampGenerator timestampGenerator = + new TimestampGenerator(); + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + this.region = env.getRegion(); + } + } + + /* + * (non-Javadoc) + * + * This method adds the tags onto the cells in the Put. It is presumed that + * all the cells in one Put have the same set of Tags. The existing cell + * timestamp is overwritten for non-metric cells and each such cell gets a new + * unique timestamp generated by {@link TimestampGenerator} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Put, + * org.apache.hadoop.hbase.regionserver.wal.WALEdit, + * org.apache.hadoop.hbase.client.Durability) + */ + @Override + public void prePut(ObserverContext e, Put put, + WALEdit edit, Durability durability) throws IOException { + Map attributes = put.getAttributesMap(); + // Assumption is that all the cells in a put are the same operation. + List tags = new ArrayList<>(); + if ((attributes != null) && (attributes.size() > 0)) { + for (Map.Entry attribute : attributes.entrySet()) { + Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute); + if (t != null) { + tags.add(t); + } + } + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); + NavigableMap> newFamilyMap = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (Map.Entry> entry : put.getFamilyCellMap() + .entrySet()) { + List newCells = new ArrayList<>(entry.getValue().size()); + for (Cell cell : entry.getValue()) { + // for each cell in the put add the tags + // Assumption is that all the cells in + // one put are the same operation + // also, get a unique cell timestamp for non-metric cells + // this way we don't inadvertently overwrite cell versions + long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); + newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), + CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), + tagByteArray)); + } + newFamilyMap.put(entry.getKey(), newCells); + } // for each entry + // Update the family map for the Put + put.setFamilyCellMap(newFamilyMap); + } + } + + /** + * Determines if the current cell's timestamp is to be used or a new unique + * cell timestamp is to be used. The reason this is done is to inadvertently + * overwrite cells when writes come in very fast. But for metric cells, the + * cell timestamp signifies the metric timestamp. Hence we don't want to + * overwrite it. + * + * @param timestamp + * @param tags + * @return cell timestamp + */ + private long getCellTimestamp(long timestamp, List tags) { + // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default) + // then use the generator + if (timestamp == HConstants.LATEST_TIMESTAMP) { + return timestampGenerator.getUniqueTimestamp(); + } else { + return timestamp; + } + } + + /* + * (non-Javadoc) + * + * Creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable}. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Get, java.util.List) + */ + @Override + public void preGetOp(ObserverContext e, + Get get, List results) throws IOException { + Scan scan = new Scan(get); + scan.setMaxVersions(); + RegionScanner scanner = null; + try { + scanner = new FlowScanner(e.getEnvironment(), scan, + region.getScanner(scan), FlowScannerOperation.READ); + scanner.next(results); + e.bypass(); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } + + /* + * (non-Javadoc) + * + * Ensures that max versions are set for the Scan so that metrics can be + * correctly aggregated and min/max can be correctly determined. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org + * .apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan) + */ + @Override + public void preScannerOpen( + ObserverContext e, Scan scan) + throws IOException { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + } + + /* + * (non-Javadoc) + * + * Creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable}. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( + * org.apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner postScannerOpen( + ObserverContext e, Scan scan, + RegionScanner scanner) throws IOException { + return new FlowScanner(e.getEnvironment(), scan, + scanner, FlowScannerOperation.READ); + } + + @Override + public InternalScanner preFlush( + ObserverContext c, Store store, + InternalScanner scanner, FlushLifeCycleTracker cycleTracker) + throws IOException { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("preFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + return new FlowScanner(c.getEnvironment(), scanner, + FlowScannerOperation.FLUSH); + } + + @Override + public void postFlush(ObserverContext c, + Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("postFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + } + + @Override + public InternalScanner preCompact( + ObserverContext e, Store store, + InternalScanner scanner, ScanType scanType, + CompactionLifeCycleTracker tracker, CompactionRequest request) + throws IOException { + + FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; + if (request != null) { + requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION); + LOG.info("Compactionrequest= " + request.toString() + " " + + requestOp.toString() + " RegionName=" + e.getEnvironment() + .getRegion().getRegionInfo().getRegionNameAsString()); + } + return new FlowScanner(e.getEnvironment(), scanner, requestOp); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java new file mode 100644 index 0000000000..b533624f3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -0,0 +1,723 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invoked via the coprocessor when a Get or a Scan is issued for flow run + * table. Looks through the list of cells per row, checks their tags and does + * operation on those cells as per the cell tags. Transforms reads of the stored + * metrics into calculated sums for each column Also, finds the min and max for + * start and end times in a flow run. + */ +class FlowScanner implements RegionScanner, Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(FlowScanner.class); + + /** + * use a special application id to represent the flow id this is needed since + * TimestampGenerator parses the app id to generate a cell timestamp. + */ + private static final String FLOW_APP_ID = "application_00000000000_0000"; + + private final Region region; + private final InternalScanner flowRunScanner; + private final int batchSize; + private final long appFinalValueRetentionThreshold; + private RegionScanner regionScanner; + private boolean hasMore; + private byte[] currentRow; + private List availableCells = new ArrayList<>(); + private int currentIndex; + private FlowScannerOperation action = FlowScannerOperation.READ; + + FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner, + FlowScannerOperation action) { + this(env, null, internalScanner, action); + } + + FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan, + InternalScanner internalScanner, FlowScannerOperation action) { + this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch(); + // TODO initialize other scan attributes like Scan#maxResultSize + this.flowRunScanner = internalScanner; + if (internalScanner instanceof RegionScanner) { + this.regionScanner = (RegionScanner) internalScanner; + } + this.action = action; + if (env == null) { + this.appFinalValueRetentionThreshold = + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; + this.region = null; + } else { + this.region = env.getRegion(); + Configuration hbaseConf = env.getConfiguration(); + this.appFinalValueRetentionThreshold = hbaseConf.getLong( + YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); + } + if (LOG.isDebugEnabled()) { + LOG.debug(" batch size=" + batchSize); + } + } + + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() + */ + @Override + public HRegionInfo getRegionInfo() { + return new HRegionInfo(region.getRegionInfo()); + } + + @Override + public boolean nextRaw(List cells) throws IOException { + return nextRaw(cells, ScannerContext.newBuilder().build()); + } + + @Override + public boolean nextRaw(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); + } + + @Override + public boolean next(List cells) throws IOException { + return next(cells, ScannerContext.newBuilder().build()); + } + + @Override + public boolean next(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); + } + + /** + * Get value converter associated with a column or a column prefix. If nothing + * matches, generic converter is returned. + * @param colQualifierBytes + * @return value converter implementation. + */ + private static ValueConverter getValueConverter(byte[] colQualifierBytes) { + // Iterate over all the column prefixes for flow run table and get the + // appropriate converter for the column qualifier passed if prefix matches. + for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) { + byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes(""); + if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length, + colQualifierBytes, 0, colPrefixBytes.length) == 0) { + return colPrefix.getValueConverter(); + } + } + // Iterate over all the columns for flow run table and get the + // appropriate converter for the column qualifier passed if match occurs. + for (FlowRunColumn column : FlowRunColumn.values()) { + if (Bytes.compareTo( + column.getColumnQualifierBytes(), colQualifierBytes) == 0) { + return column.getValueConverter(); + } + } + // Return generic converter if nothing matches. + return GenericConverter.getInstance(); + } + + /** + * This method loops through the cells in a given row of the + * {@link FlowRunTable}. It looks at the tags of each cell to figure out how + * to process the contents. It then calculates the sum or min or max for each + * column or returns the cell as is. + * + * @param cells + * @param scannerContext + * @return true if next row is available for the scanner, false otherwise + * @throws IOException + */ + private boolean nextInternal(List cells, ScannerContext scannerContext) + throws IOException { + Cell cell = null; + startNext(); + // Loop through all the cells in this row + // For min/max/metrics we do need to scan the entire set of cells to get the + // right one + // But with flush/compaction, the number of cells being scanned will go down + // cells are grouped per column qualifier then sorted by cell timestamp + // (latest to oldest) per column qualifier + // So all cells in one qualifier come one after the other before we see the + // next column qualifier + ByteArrayComparator comp = new ByteArrayComparator(); + byte[] previousColumnQualifier = Separator.EMPTY_BYTES; + AggregationOperation currentAggOp = null; + SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); + Set alreadySeenAggDim = new HashSet<>(); + int addedCnt = 0; + long currentTimestamp = System.currentTimeMillis(); + ValueConverter converter = null; + int limit = batchSize; + + while (limit <= 0 || addedCnt < limit) { + cell = peekAtNextCell(scannerContext); + if (cell == null) { + break; + } + byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell); + if (previousColumnQualifier == null) { + // first time in loop + previousColumnQualifier = currentColumnQualifier; + } + + converter = getValueConverter(currentColumnQualifier); + if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + converter, currentTimestamp); + resetState(currentColumnCells, alreadySeenAggDim); + previousColumnQualifier = currentColumnQualifier; + currentAggOp = getCurrentAggOp(cell); + converter = getValueConverter(currentColumnQualifier); + } + collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, + converter, scannerContext); + nextCell(scannerContext); + } + if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter, + currentTimestamp); + if (LOG.isDebugEnabled()) { + if (addedCnt > 0) { + LOG.debug("emitted cells. " + addedCnt + " for " + this.action + + " rowKey=" + + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0)))); + } else { + LOG.debug("emitted no cells for " + this.action); + } + } + } + return hasMore(); + } + + private AggregationOperation getCurrentAggOp(Cell cell) { + List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + // We assume that all the operations for a particular column are the same + return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags); + } + + /** + * resets the parameters to an initialized state for next loop iteration. + */ + private void resetState(SortedSet currentColumnCells, + Set alreadySeenAggDim) { + currentColumnCells.clear(); + alreadySeenAggDim.clear(); + } + + private void collectCells(SortedSet currentColumnCells, + AggregationOperation currentAggOp, Cell cell, + Set alreadySeenAggDim, ValueConverter converter, + ScannerContext scannerContext) throws IOException { + + if (currentAggOp == null) { + // not a min/max/metric cell, so just return it as is + currentColumnCells.add(cell); + return; + } + + switch (currentAggOp) { + case GLOBAL_MIN: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMinCell = currentColumnCells.first(); + Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, + (NumericValueConverter) converter); + if (!currentMinCell.equals(newMinCell)) { + currentColumnCells.remove(currentMinCell); + currentColumnCells.add(newMinCell); + } + } + break; + case GLOBAL_MAX: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMaxCell = currentColumnCells.first(); + Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, + (NumericValueConverter) converter); + if (!currentMaxCell.equals(newMaxCell)) { + currentColumnCells.remove(currentMaxCell); + currentColumnCells.add(newMaxCell); + } + } + break; + case SUM: + case SUM_FINAL: + if (LOG.isTraceEnabled()) { + LOG.trace("In collect cells " + + " FlowSannerOperation=" + + this.action + + " currentAggOp=" + + currentAggOp + + " cell qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + + " cell value= " + + converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp()); + } + + // only if this app has not been seen yet, add to current column cells + List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + String aggDim = HBaseTimelineServerUtils + .getAggregationCompactionDimension(tags); + if (!alreadySeenAggDim.contains(aggDim)) { + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + // but when this agg dim is not seen, + // consider this cell in our working set + currentColumnCells.add(cell); + alreadySeenAggDim.add(aggDim); + } + break; + default: + break; + } // end of switch case + } + + /* + * Processes the cells in input param currentColumnCells and populates + * List cells as the output based on the input AggregationOperation + * parameter. + */ + private int emitCells(List cells, SortedSet currentColumnCells, + AggregationOperation currentAggOp, ValueConverter converter, + long currentTimestamp) throws IOException { + if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { + return 0; + } + if (currentAggOp == null) { + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("In emitCells " + this.action + " currentColumnCells size= " + + currentColumnCells.size() + " currentAggOp" + currentAggOp); + } + + switch (currentAggOp) { + case GLOBAL_MIN: + case GLOBAL_MAX: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case SUM: + case SUM_FINAL: + switch (action) { + case FLUSH: + case MINOR_COMPACTION: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case READ: + Cell sumCell = processSummation(currentColumnCells, + (NumericValueConverter) converter); + cells.add(sumCell); + return 1; + case MAJOR_COMPACTION: + List finalCells = processSummationMajorCompaction( + currentColumnCells, (NumericValueConverter) converter, + currentTimestamp); + cells.addAll(finalCells); + return finalCells.size(); + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + } + + /* + * Returns a cell whose value is the sum of all cell values in the input set. + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + private Cell processSummation(SortedSet currentColumnCells, + NumericValueConverter converter) throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + long mostCurrentTimestamp = 0L; + Cell mostRecentCell = null; + for (Cell cell : currentColumnCells) { + currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell)); + ts = cell.getTimestamp(); + if (mostCurrentTimestamp < ts) { + mostCurrentTimestamp = ts; + mostRecentCell = cell; + } + sum = converter.add(sum, currentValue); + } + byte[] sumBytes = converter.encodeValue(sum); + Cell sumCell = + HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes); + return sumCell; + } + + + /** + * Returns a list of cells that contains + * + * A) the latest cells for applications that haven't finished yet + * B) summation + * for the flow, based on applications that have completed and are older than + * a certain time + * + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + @VisibleForTesting + List processSummationMajorCompaction( + SortedSet currentColumnCells, NumericValueConverter converter, + long currentTimestamp) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + boolean summationDone = false; + List finalCells = new ArrayList(); + if (currentColumnCells == null) { + return finalCells; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("In processSummationMajorCompaction," + + " will drop cells older than " + currentTimestamp + + " CurrentColumnCells size=" + currentColumnCells.size()); + } + + for (Cell cell : currentColumnCells) { + AggregationOperation cellAggOp = getCurrentAggOp(cell); + // if this is the existing flow sum cell + List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + String appId = HBaseTimelineServerUtils + .getAggregationCompactionDimension(tags); + if (appId == FLOW_APP_ID) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("reading flow app id sum=" + sum); + } + } else { + currentValue = (Number) converter.decodeValue(CellUtil + .cloneValue(cell)); + // read the timestamp truncated by the generator + ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); + if ((cellAggOp == AggregationOperation.SUM_FINAL) + && ((ts + this.appFinalValueRetentionThreshold) + < currentTimestamp)) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION loop sum= " + sum + + " discarding now: " + " qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" + + converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp() + " " + this.action); + } + } else { + // not a final value but it's the latest cell for this app + // so include this cell in the list of cells to write back + finalCells.add(cell); + } + } + } + if (summationDone) { + Cell anyCell = currentColumnCells.first(); + List tags = new ArrayList(); + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + t = HBaseTimelineServerUtils.createTag( + AggregationCompactionDimension.APPLICATION_ID.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); + Cell sumCell = HBaseTimelineServerUtils.createNewCell( + CellUtil.cloneRow(anyCell), + CellUtil.cloneFamily(anyCell), + CellUtil.cloneQualifier(anyCell), + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), FLOW_APP_ID), + converter.encodeValue(sum), tagByteArray); + finalCells.add(sumCell); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " " + this.action); + } + LOG.info("After major compaction for qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with sum=" + sum.longValue() + + " with cell timestamp " + sumCell.getTimestamp()); + } else { + String qualifier = ""; + LOG.info("After major compaction for qualifier=" + qualifier + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with zero sum=" + + sum.longValue()); + } + return finalCells; + } + + /** + * Determines which cell is to be returned based on the values in each cell + * and the comparison operation MIN or MAX. + * + * @param previouslyChosenCell + * @param currentCell + * @param currentAggOp + * @return the cell which is the min (or max) cell + * @throws IOException + */ + private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, + AggregationOperation currentAggOp, NumericValueConverter converter) + throws IOException { + if (previouslyChosenCell == null) { + return currentCell; + } + try { + Number previouslyChosenCellValue = (Number)converter.decodeValue( + CellUtil.cloneValue(previouslyChosenCell)); + Number currentCellValue = (Number) converter.decodeValue(CellUtil + .cloneValue(currentCell)); + switch (currentAggOp) { + case GLOBAL_MIN: + if (converter.compare( + currentCellValue, previouslyChosenCellValue) < 0) { + // new value is minimum, hence return this cell + return currentCell; + } else { + // previously chosen value is miniumum, hence return previous min cell + return previouslyChosenCell; + } + case GLOBAL_MAX: + if (converter.compare( + currentCellValue, previouslyChosenCellValue) > 0) { + // new value is max, hence return this cell + return currentCell; + } else { + // previously chosen value is max, hence return previous max cell + return previouslyChosenCell; + } + default: + return currentCell; + } + } catch (IllegalArgumentException iae) { + LOG.error("caught iae during conversion to long ", iae); + return currentCell; + } + } + + @Override + public void close() throws IOException { + if (flowRunScanner != null) { + flowRunScanner.close(); + } else { + LOG.warn("scanner close called but scanner is null"); + } + } + + /** + * Called to signal the start of the next() call by the scanner. + */ + public void startNext() { + currentRow = null; + } + + /** + * Returns whether or not the underlying scanner has more rows. + */ + public boolean hasMore() { + return currentIndex < availableCells.size() ? true : hasMore; + } + + /** + * Returns the next available cell for the current row and advances the + * pointer to the next cell. This method can be called multiple times in a row + * to advance through all the available cells. + * + * @param scannerContext + * context information for the batch of cells under consideration + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell nextCell(ScannerContext scannerContext) throws IOException { + Cell cell = peekAtNextCell(scannerContext); + if (cell != null) { + currentIndex++; + } + return cell; + } + + /** + * Returns the next available cell for the current row, without advancing the + * pointer. Calling this method multiple times in a row will continue to + * return the same cell. + * + * @param scannerContext + * context information for the batch of cells under consideration + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException if any problem is encountered while grabbing the next + * cell. + */ + public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { + if (currentIndex >= availableCells.size()) { + // done with current batch + availableCells.clear(); + currentIndex = 0; + hasMore = flowRunScanner.next(availableCells, scannerContext); + } + Cell cell = null; + if (currentIndex < availableCells.size()) { + cell = availableCells.get(currentIndex); + if (currentRow == null) { + currentRow = CellUtil.cloneRow(cell); + } else if (!CellUtil.matchingRow(cell, currentRow)) { + // moved on to the next row + // don't use the current cell + // also signal no more cells for this row + return null; + } + } + return cell; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() + */ + @Override + public long getMaxResultSize() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's scanner is not a RegionScanner"); + } + return regionScanner.getMaxResultSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() + */ + @Override + public long getMvccReadPoint() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.getMvccReadPoint(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() + */ + @Override + public boolean isFilterDone() throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.isFilterDone(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) + */ + @Override + public boolean reseek(byte[] bytes) throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.reseek() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.reseek(bytes); + } + + @Override + public int getBatch() { + return batchSize; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java new file mode 100644 index 0000000000..73c666fa9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + + +/** + * Identifies the scanner operation on the {@link FlowRunTable}. + */ +public enum FlowScannerOperation { + + /** + * If the scanner is opened for reading + * during preGet or preScan. + */ + READ, + + /** + * If the scanner is opened during preFlush. + */ + FLUSH, + + /** + * If the scanner is opened during minor Compaction. + */ + MINOR_COMPACTION, + + /** + * If the scanner is opened during major Compaction. + */ + MAJOR_COMPACTION +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java new file mode 100644 index 0000000000..04963f3f1d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow + * contains classes related to implementation for flow related tables, viz. flow + * run table and flow activity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java new file mode 100644 index 0000000000..e78db2a1ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage contains + * classes which define and implement reading and writing to backend storage. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml index d06907d5d9..3ca443f81b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml @@ -24,138 +24,36 @@ org.apache.hadoop 3.2.0-SNAPSHOT - 4.0.0 + hadoop-yarn-server-timelineservice-hbase-server - Apache Hadoop YARN TimelineService HBase Server 3.2.0-SNAPSHOT + Apache Hadoop YARN TimelineService HBase Servers + pom - - - ${project.parent.parent.parent.basedir} - - - - - commons-logging - commons-logging - - - - com.google.guava - guava - - - - org.apache.hadoop - hadoop-common - provided - - - - org.apache.hadoop - hadoop-yarn-api - provided - - - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-common - - - - org.apache.hbase - hbase-common - provided - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - org.mortbay.jetty - jetty-util - - - - - - org.apache.hbase - hbase-client - provided - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - - - - org.apache.hbase - hbase-server - provided - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-hdfs-client - - - org.apache.hadoop - hadoop-client - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - jetty-sslengine - - - - - - - - - maven-assembly-plugin - - src/assembly/coprocessor.xml - true - - - - create-coprocessor-jar - prepare-package - - single - - - - - - maven-jar-plugin - - - - test-jar - - - - - - + + + hbase1 + + + !hbase.profile + + + + hadoop-yarn-server-timelineservice-hbase-server-1 + + + + hbase2 + + + hbase.profile + 2.0 + + + + hadoop-yarn-server-timelineservice-hbase-server-2 + + + \ No newline at end of file