From 10663b78c8596693322dc3636f173035195bf607 Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Wed, 28 Feb 2018 21:11:36 -0800 Subject: [PATCH] Revert "yarn-7346.07.patch" This reverts commit 5e37ca5bb49f945e27f49a413d08baab562dfa9c. --- .../resources/assemblies/hadoop-yarn-dist.xml | 2 +- hadoop-project/pom.xml | 68 +- .../pom.xml | 78 +- .../storage/flow/TestHBaseStorageFlowRun.java | 19 +- .../TestHBaseStorageFlowRunCompaction.java | 103 ++- .../pom.xml | 6 +- .../pom.xml | 186 ----- .../src/assembly/coprocessor.xml | 38 - .../pom.xml | 194 ----- .../common/HBaseTimelineServerUtils.java | 224 ------ .../storage/common/package-info.java | 28 - .../storage/flow/FlowRunCoprocessor.java | 285 ------- .../storage/flow/FlowScanner.java | 723 ------------------ .../storage/flow/FlowScannerOperation.java | 46 -- .../storage/flow/package-info.java | 29 - .../timelineservice/storage/package-info.java | 28 - .../pom.xml | 158 +++- .../src/assembly/coprocessor.xml | 5 +- .../common/HBaseTimelineServerUtils.java | 91 +-- .../storage/common/package-info.java | 0 .../storage/flow/FlowRunCoprocessor.java | 3 +- .../storage/flow/FlowScanner.java | 18 +- .../storage/flow/FlowScannerOperation.java | 0 .../storage/flow/package-info.java | 0 .../timelineservice/storage/package-info.java | 0 25 files changed, 231 insertions(+), 2101 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-2 => }/src/assembly/coprocessor.xml (91%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java (60%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java (100%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java (99%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java (97%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java (100%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java (100%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/{hadoop-yarn-server-timelineservice-hbase-server-1 => }/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java (100%) diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index 382c967263..2c266b617f 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -236,7 +236,7 @@ - org.apache.hadoop:${hbase-server-artifactid} + org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server share/hadoop/${hadoop.component}/timelineservice diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index d23a548f5a..7cc68bb691 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -49,6 +49,9 @@ 2.11.0 0.8.2.1 + 1.2.6 + 2.5.1 + 11.0.2 ${project.version} 1.0.13 @@ -404,6 +407,12 @@ ${project.version} + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-server + ${project.version} + + org.apache.hadoop hadoop-yarn-applications-distributedshell @@ -657,6 +666,7 @@ jsp-api 2.1 + org.glassfish javax.servlet @@ -1829,64 +1839,6 @@ - - - hbase1 - - - !hbase.profile - - - - 1.2.6 - 2.5.1 - 11.0.2 - hadoop-yarn-server-timelineservice-hbase-server-1 - - - - - org.apache.hadoop - ${hbase-server-artifactid} - ${project.version} - - - - - - - hbase2 - - - hbase.profile - 2.0 - - - - 2.0.0-beta-1 - 3.0.0 - 11.0.2 - hadoop-yarn-server-timelineservice-hbase-server-2 - - - - - org.apache.hadoop - ${hbase-server-artifactid} - ${project.version} - - - org.jruby.jcodings - jcodings - 1.0.13 - - - - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index 2c8d5dd241..d9f992d056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -27,7 +27,7 @@ 4.0.0 hadoop-yarn-server-timelineservice-hbase-tests 3.2.0-SNAPSHOT - Apache Hadoop YARN TimelineService HBase tests + Apache Hadoop YARN Timeline Service HBase tests @@ -82,6 +82,18 @@ + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-server + test + + + org.apache.hadoop + hadoop-common + + + + org.apache.hadoop hadoop-common @@ -402,68 +414,4 @@ - - - - hbase1 - - - !hbase.profile - - - - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-server-1 - test - - - org.apache.hadoop - hadoop-common - - - - - - - - hbase2 - - - hbase.profile - 2.0 - - - - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-server-2 - test - - - org.apache.hadoop - hadoop-common - - - - - - org.apache.hadoop - hadoop-hdfs-client - ${hbase-compatible-hadoop.version} - test - - - - - org.mockito - mockito-all - test - - - - - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index c7d0d4e3e2..622b0eba22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -62,7 +64,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -88,7 +89,7 @@ public static void setupBeforeClass() throws Exception { } @Test - public void checkCoProcessorOff() throws Exception, InterruptedException { + public void checkCoProcessorOff() throws IOException, InterruptedException { Configuration hbaseConf = util.getConfiguration(); TableName table = BaseTableRW.getTableName(hbaseConf, FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME); @@ -126,9 +127,19 @@ public void checkCoProcessorOff() throws Exception, InterruptedException { } private void checkCoprocessorExists(TableName table, boolean exists) - throws Exception { + throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); - HBaseTimelineServerUtils.validateFlowRunCoprocessor(server, table, exists); + List regions = server.getOnlineRegions(table); + for (Region region : regions) { + boolean found = false; + Set coprocs = region.getCoprocessorHost().getCoprocessors(); + for (String coprocName : coprocs) { + if (coprocName.contains("FlowRunCoprocessor")) { + found = true; + } + } + assertEquals(found, exists); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 2ff37afc98..31be285e41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -45,7 +44,9 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -326,15 +327,20 @@ public void testWriteFlowRunCompaction() throws Exception { } // check in flow run table - TableName flowRunTable = BaseTableRW.getTableName(c1, - FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME); - HRegionServer server = util.getRSForFirstRegionInTable(flowRunTable); - - // flush and compact all the regions of the primary table - int regionNum = HBaseTimelineServerUtils.flushCompactTableRegions( - server, flowRunTable); + HRegionServer server = util.getRSForFirstRegionInTable( + BaseTableRW.getTableName(c1, FlowRunTableRW.TABLE_NAME_CONF_NAME, + FlowRunTableRW.DEFAULT_TABLE_NAME)); + List regions = server.getOnlineRegions( + BaseTableRW.getTableName(c1, + FlowRunTableRW.TABLE_NAME_CONF_NAME, + FlowRunTableRW.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", - regionNum > 0); + regions.size() > 0); + // flush and compact all the regions of the primary table + for (Region region : regions) { + region.flush(true); + region.compact(true); + } // check flow run for one flow many apps checkFlowRunTable(cluster, user, flow, runid, c1, 4); @@ -386,10 +392,13 @@ private void checkFlowRunTable(String cluster, String user, String flow, private FlowScanner getFlowScannerForTestingCompaction() { // create a FlowScanner object with the sole purpose of invoking a process // summation; + CompactionRequest request = new CompactionRequest(); + request.setIsMajor(true, true); // okay to pass in nulls for the constructor arguments // because all we want to do is invoke the process summation FlowScanner fs = new FlowScanner(null, null, - FlowScannerOperation.MAJOR_COMPACTION); + (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION)); assertNotNull(fs); return fs; } @@ -414,45 +423,40 @@ public void checkProcessSummationMoreCellsSumFinal2() SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); List tags = new ArrayList<>(); - Tag t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_91188"); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL Cell c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_12700000001_29102"); tags.add(t); - tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); + tagByteArray = Tag.fromList(tags); // create a cell with a recent timestamp and attribute SUM_FINAL Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM.getTagType(), + t = new Tag(AggregationOperation.SUM.getTagType(), "application_191780000000001_8195"); tags.add(t); - tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); + tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c3 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); currentColumnCells.add(c3); tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM.getTagType(), + t = new Tag(AggregationOperation.SUM.getTagType(), "application_191780000000001_98104"); tags.add(t); - tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); + tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c4 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); @@ -519,12 +523,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // insert SUM_FINAL cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -535,12 +537,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // add SUM cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM.getTagType(), + t = new Tag(AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); // create a cell with attribute SUM c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -614,12 +614,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // insert SUM_FINAL cells which will expire for (int i = 0; i < countFinal; i++) { tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -630,12 +628,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // insert SUM_FINAL cells which will NOT expire for (int i = 0; i < countFinalNotExpire; i++) { tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -646,12 +642,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // add SUM cells for (int i = 0; i < countNotFinal; i++) { tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM.getTagType(), + t = new Tag(AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); // create a cell with attribute SUM c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -703,12 +697,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { long cellValue2 = 28L; List tags = new ArrayList<>(); - Tag t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_999888"); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp and attribute SUM_FINAL @@ -717,11 +709,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { currentColumnCells.add(c1); tags = new ArrayList<>(); - t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM.getTagType(), + t = new Tag(AggregationOperation.SUM.getTagType(), "application_100000000001_119101"); tags.add(t); - tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); + tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, @@ -764,12 +755,10 @@ public void testProcessSummationOneCellSumFinal() throws IOException { // note down the current timestamp long currentTimestamp = System.currentTimeMillis(); List tags = new ArrayList<>(); - Tag t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp @@ -804,12 +793,10 @@ public void testProcessSummationOneCell() throws IOException { // try for 1 cell with tag SUM List tags = new ArrayList<>(); - Tag t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM.getTagType(), + Tag t = new Tag(AggregationOperation.SUM.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml index 3602f02003..a1db497111 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml @@ -40,8 +40,8 @@ - org.slf4j - slf4j-api + commons-logging + commons-logging @@ -93,7 +93,7 @@ org.apache.hadoop - hadoop-yarn-server-common + hadoop-yarn-server-applicationhistoryservice provided diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml deleted file mode 100644 index df7c5e3f06..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml +++ /dev/null @@ -1,186 +0,0 @@ - - - - - hadoop-yarn-server-timelineservice-hbase-server - org.apache.hadoop - 3.2.0-SNAPSHOT - - - 4.0.0 - hadoop-yarn-server-timelineservice-hbase-server-1 - Apache Hadoop YARN TimelineService HBase Server 1.2 - 3.2.0-SNAPSHOT - - - - ${project.parent.parent.parent.parent.basedir} - - - - - default - - true - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - default-compile - none - - - - - - - - - hbase1 - - - !hbase.profile - - - - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-common - - - - org.slf4j - slf4j-api - - - - com.google.guava - guava - - - - org.apache.hadoop - hadoop-annotations - provided - - - - org.apache.hadoop - hadoop-common - provided - - - - org.apache.hadoop - hadoop-yarn-api - provided - - - - org.apache.hbase - hbase-common - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - org.mortbay.jetty - jetty-util - - - - - - org.apache.hbase - hbase-client - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - - - - org.apache.hbase - hbase-server - provided - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-hdfs-client - - - org.apache.hadoop - hadoop-client - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - jetty-sslengine - - - - - - - - - maven-assembly-plugin - - src/assembly/coprocessor.xml - true - - - - create-coprocessor-jar - prepare-package - - single - - - - - - - - - \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml deleted file mode 100644 index dd53bf22c2..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - coprocessor - - jar - - false - - - / - true - true - runtime - - org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common - org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1 - - - - \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml deleted file mode 100644 index 0b2cb2fed6..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/pom.xml +++ /dev/null @@ -1,194 +0,0 @@ - - - - - hadoop-yarn-server-timelineservice-hbase-server - org.apache.hadoop - 3.2.0-SNAPSHOT - - 4.0.0 - - hadoop-yarn-server-timelineservice-hbase-server-2 - Apache Hadoop YARN TimelineService HBase Server 2.0 - 3.2.0-SNAPSHOT - - - - ${project.parent.parent.parent.parent.basedir} - - - - - default - - true - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - default-compile - none - - - - - - - - - hbase2 - - - hbase.profile - 2.0 - - - - - org.apache.hadoop - hadoop-yarn-server-timelineservice-hbase-common - - - - org.slf4j - slf4j-api - - - - com.google.guava - guava - - - - org.apache.hadoop - hadoop-annotations - provided - - - - org.apache.hadoop - hadoop-common - provided - - - - org.apache.hadoop - hadoop-yarn-api - provided - - - - org.apache.hbase - hbase-common - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - org.mortbay.jetty - jetty-util - - - - - - org.apache.hbase - hbase-client - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - - - - - org.jruby.jcodings - jcodings - - - - org.apache.hbase - hbase-server - provided - - - org.apache.hadoop - hadoop-hdfs - - - org.apache.hadoop - hadoop-hdfs-client - - - org.apache.hadoop - hadoop-client - - - org.apache.hadoop - hadoop-mapreduce-client-core - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - jetty-sslengine - - - - - - - - - maven-assembly-plugin - - - create-coprocessor-jar - prepare-package - - single - - - src/assembly/coprocessor.xml - true - - - - - - - - - \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java deleted file mode 100644 index cf2d5e0335..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.hbase.ArrayBackedTag; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagUtil; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * A utility class used by hbase-server module. - */ -public final class HBaseTimelineServerUtils { - private HBaseTimelineServerUtils() { - } - - /** - * Creates a {@link Tag} from the input attribute. - * - * @param attribute Attribute from which tag has to be fetched. - * @return a HBase Tag. - */ - public static Tag getTagFromAttribute(Map.Entry attribute) { - // attribute could be either an Aggregation Operation or - // an Aggregation Dimension - // Get the Tag type from either - AggregationOperation aggOp = AggregationOperation - .getAggregationOperation(attribute.getKey()); - if (aggOp != null) { - Tag t = createTag(aggOp.getTagType(), attribute.getValue()); - return t; - } - - AggregationCompactionDimension aggCompactDim = - AggregationCompactionDimension.getAggregationCompactionDimension( - attribute.getKey()); - if (aggCompactDim != null) { - Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue()); - return t; - } - return null; - } - - /** - * creates a new cell based on the input cell but with the new value. - * - * @param origCell Original cell - * @param newValue new cell value - * @return cell - * @throws IOException while creating new cell. - */ - public static Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - - /** - * creates a cell with the given inputs. - * - * @param row row of the cell to be created - * @param family column family name of the new cell - * @param qualifier qualifier for the new cell - * @param ts timestamp of the new cell - * @param newValue value of the new cell - * @param tags tags in the new cell - * @return cell - * @throws IOException while creating the cell. - */ - public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, - long ts, byte[] newValue, byte[] tags) throws IOException { - return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, - newValue, tags); - } - - /** - * Create a Tag. - * @param tagType tag type - * @param tag the content of the tag in byte array. - * @return an instance of Tag - */ - public static Tag createTag(byte tagType, byte[] tag) { - return new ArrayBackedTag(tagType, tag); - } - - /** - * Create a Tag. - * @param tagType tag type - * @param tag the content of the tag in String. - * @return an instance of Tag - */ - public static Tag createTag(byte tagType, String tag) { - return createTag(tagType, Bytes.toBytes(tag)); - } - - /** - * Convert a cell to a list of tags. - * @param cell the cell to convert - * @return a list of tags - */ - public static List convertCellAsTagList(Cell cell) { - return TagUtil.asList( - cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - } - - /** - * Convert a list of tags to a byte array. - * @param tags the list of tags to convert - * @return byte array representation of the list of tags - */ - public static byte[] convertTagListToByteArray(List tags) { - return TagUtil.fromList(tags); - } - - /** - * returns app id from the list of tags. - * - * @param tags cell tags to be looked into - * @return App Id as the AggregationCompactionDimension - */ - public static String getAggregationCompactionDimension(List tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(Tag.cloneValue(t)); - return appId; - } - } - return appId; - } - - /** - * Returns the first seen aggregation operation as seen in the list of input - * tags or null otherwise. - * - * @param tags list of HBase tags. - * @return AggregationOperation - */ - public static AggregationOperation getAggregationOperationFromTagsList( - List tags) { - for (AggregationOperation aggOp : AggregationOperation.values()) { - for (Tag tag : tags) { - if (tag.getType() == aggOp.getTagType()) { - return aggOp; - } - } - } - return null; - } - - // flush and compact all the regions of the primary table - - /** - * Flush and compact all regions of a table. - * @param server region server - * @param table the table to flush and compact - * @return the number of regions flushed and compacted - */ - public static int flushCompactTableRegions(HRegionServer server, - TableName table) throws IOException { - List regions = server.getRegions(table); - for (HRegion region : regions) { - region.flush(true); - region.compact(true); - } - return regions.size(); - } - - /** - * Check the existence of FlowRunCoprocessor in a table. - * @param server region server - * @param table table to check - * @param existenceExpected true if the FlowRunCoprocessor is expected - * to be loaded in the table, false otherwise - * @throws Exception - */ - public static void validateFlowRunCoprocessor(HRegionServer server, - TableName table, boolean existenceExpected) throws Exception { - List regions = server.getRegions(table); - for (HRegion region : regions) { - boolean found = false; - Set coprocs = region.getCoprocessorHost().getCoprocessors(); - for (String coprocName : coprocs) { - if (coprocName.contains("FlowRunCoprocessor")) { - found = true; - } - } - if (found != existenceExpected) { - throw new Exception("FlowRunCoprocessor is" + - (existenceExpected ? " not " : " ") + "loaded in table " + table); - } - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java deleted file mode 100644 index 0df5b8af84..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains - * a set of utility classes used across backend storage reader and writer. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java deleted file mode 100644 index 16caca4f5f..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Optional; -import java.util.TreeMap; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Coprocessor for flow run table. - */ -public class FlowRunCoprocessor implements RegionCoprocessor, RegionObserver { - - private static final Logger LOG = - LoggerFactory.getLogger(FlowRunCoprocessor.class); - - private Region region; - /** - * generate a timestamp that is unique per row in a region this is per region. - */ - private final TimestampGenerator timestampGenerator = - new TimestampGenerator(); - - @Override - public Optional getRegionObserver() { - return Optional.of(this); - } - - @Override - public void start(CoprocessorEnvironment e) throws IOException { - if (e instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - this.region = env.getRegion(); - } - } - - /* - * (non-Javadoc) - * - * This method adds the tags onto the cells in the Put. It is presumed that - * all the cells in one Put have the same set of Tags. The existing cell - * timestamp is overwritten for non-metric cells and each such cell gets a new - * unique timestamp generated by {@link TimestampGenerator} - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache - * .hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Put, - * org.apache.hadoop.hbase.regionserver.wal.WALEdit, - * org.apache.hadoop.hbase.client.Durability) - */ - @Override - public void prePut(ObserverContext e, Put put, - WALEdit edit, Durability durability) throws IOException { - Map attributes = put.getAttributesMap(); - // Assumption is that all the cells in a put are the same operation. - List tags = new ArrayList<>(); - if ((attributes != null) && (attributes.size() > 0)) { - for (Map.Entry attribute : attributes.entrySet()) { - Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute); - if (t != null) { - tags.add(t); - } - } - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); - NavigableMap> newFamilyMap = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for (Map.Entry> entry : put.getFamilyCellMap() - .entrySet()) { - List newCells = new ArrayList<>(entry.getValue().size()); - for (Cell cell : entry.getValue()) { - // for each cell in the put add the tags - // Assumption is that all the cells in - // one put are the same operation - // also, get a unique cell timestamp for non-metric cells - // this way we don't inadvertently overwrite cell versions - long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); - newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), - CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), - cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), - tagByteArray)); - } - newFamilyMap.put(entry.getKey(), newCells); - } // for each entry - // Update the family map for the Put - put.setFamilyCellMap(newFamilyMap); - } - } - - /** - * Determines if the current cell's timestamp is to be used or a new unique - * cell timestamp is to be used. The reason this is done is to inadvertently - * overwrite cells when writes come in very fast. But for metric cells, the - * cell timestamp signifies the metric timestamp. Hence we don't want to - * overwrite it. - * - * @param timestamp - * @param tags - * @return cell timestamp - */ - private long getCellTimestamp(long timestamp, List tags) { - // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default) - // then use the generator - if (timestamp == HConstants.LATEST_TIMESTAMP) { - return timestampGenerator.getUniqueTimestamp(); - } else { - return timestamp; - } - } - - /* - * (non-Javadoc) - * - * Creates a {@link FlowScanner} Scan so that it can correctly process the - * contents of {@link FlowRunTable}. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache - * .hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Get, java.util.List) - */ - @Override - public void preGetOp(ObserverContext e, - Get get, List results) throws IOException { - Scan scan = new Scan(get); - scan.setMaxVersions(); - RegionScanner scanner = null; - try { - scanner = new FlowScanner(e.getEnvironment(), scan, - region.getScanner(scan), FlowScannerOperation.READ); - scanner.next(results); - e.bypass(); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } - - /* - * (non-Javadoc) - * - * Ensures that max versions are set for the Scan so that metrics can be - * correctly aggregated and min/max can be correctly determined. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org - * .apache.hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Scan) - */ - @Override - public void preScannerOpen( - ObserverContext e, Scan scan) - throws IOException { - // set max versions for scan to see all - // versions to aggregate for metrics - scan.setMaxVersions(); - } - - /* - * (non-Javadoc) - * - * Creates a {@link FlowScanner} Scan so that it can correctly process the - * contents of {@link FlowRunTable}. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( - * org.apache.hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Scan, - * org.apache.hadoop.hbase.regionserver.RegionScanner) - */ - @Override - public RegionScanner postScannerOpen( - ObserverContext e, Scan scan, - RegionScanner scanner) throws IOException { - return new FlowScanner(e.getEnvironment(), scan, - scanner, FlowScannerOperation.READ); - } - - @Override - public InternalScanner preFlush( - ObserverContext c, Store store, - InternalScanner scanner, FlushLifeCycleTracker cycleTracker) - throws IOException { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("preFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - return new FlowScanner(c.getEnvironment(), scanner, - FlowScannerOperation.FLUSH); - } - - @Override - public void postFlush(ObserverContext c, - Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("postFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - } - - @Override - public InternalScanner preCompact( - ObserverContext e, Store store, - InternalScanner scanner, ScanType scanType, - CompactionLifeCycleTracker tracker, CompactionRequest request) - throws IOException { - - FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; - if (request != null) { - requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION); - LOG.info("Compactionrequest= " + request.toString() + " " - + requestOp.toString() + " RegionName=" + e.getEnvironment() - .getRegion().getRegionInfo().getRegionNameAsString()); - } - return new FlowScanner(e.getEnvironment(), scanner, requestOp); - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java deleted file mode 100644 index b533624f3e..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ /dev/null @@ -1,723 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Invoked via the coprocessor when a Get or a Scan is issued for flow run - * table. Looks through the list of cells per row, checks their tags and does - * operation on those cells as per the cell tags. Transforms reads of the stored - * metrics into calculated sums for each column Also, finds the min and max for - * start and end times in a flow run. - */ -class FlowScanner implements RegionScanner, Closeable { - - private static final Logger LOG = - LoggerFactory.getLogger(FlowScanner.class); - - /** - * use a special application id to represent the flow id this is needed since - * TimestampGenerator parses the app id to generate a cell timestamp. - */ - private static final String FLOW_APP_ID = "application_00000000000_0000"; - - private final Region region; - private final InternalScanner flowRunScanner; - private final int batchSize; - private final long appFinalValueRetentionThreshold; - private RegionScanner regionScanner; - private boolean hasMore; - private byte[] currentRow; - private List availableCells = new ArrayList<>(); - private int currentIndex; - private FlowScannerOperation action = FlowScannerOperation.READ; - - FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner, - FlowScannerOperation action) { - this(env, null, internalScanner, action); - } - - FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan, - InternalScanner internalScanner, FlowScannerOperation action) { - this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch(); - // TODO initialize other scan attributes like Scan#maxResultSize - this.flowRunScanner = internalScanner; - if (internalScanner instanceof RegionScanner) { - this.regionScanner = (RegionScanner) internalScanner; - } - this.action = action; - if (env == null) { - this.appFinalValueRetentionThreshold = - YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; - this.region = null; - } else { - this.region = env.getRegion(); - Configuration hbaseConf = env.getConfiguration(); - this.appFinalValueRetentionThreshold = hbaseConf.getLong( - YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, - YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); - } - if (LOG.isDebugEnabled()) { - LOG.debug(" batch size=" + batchSize); - } - } - - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() - */ - @Override - public HRegionInfo getRegionInfo() { - return new HRegionInfo(region.getRegionInfo()); - } - - @Override - public boolean nextRaw(List cells) throws IOException { - return nextRaw(cells, ScannerContext.newBuilder().build()); - } - - @Override - public boolean nextRaw(List cells, ScannerContext scannerContext) - throws IOException { - return nextInternal(cells, scannerContext); - } - - @Override - public boolean next(List cells) throws IOException { - return next(cells, ScannerContext.newBuilder().build()); - } - - @Override - public boolean next(List cells, ScannerContext scannerContext) - throws IOException { - return nextInternal(cells, scannerContext); - } - - /** - * Get value converter associated with a column or a column prefix. If nothing - * matches, generic converter is returned. - * @param colQualifierBytes - * @return value converter implementation. - */ - private static ValueConverter getValueConverter(byte[] colQualifierBytes) { - // Iterate over all the column prefixes for flow run table and get the - // appropriate converter for the column qualifier passed if prefix matches. - for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) { - byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes(""); - if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length, - colQualifierBytes, 0, colPrefixBytes.length) == 0) { - return colPrefix.getValueConverter(); - } - } - // Iterate over all the columns for flow run table and get the - // appropriate converter for the column qualifier passed if match occurs. - for (FlowRunColumn column : FlowRunColumn.values()) { - if (Bytes.compareTo( - column.getColumnQualifierBytes(), colQualifierBytes) == 0) { - return column.getValueConverter(); - } - } - // Return generic converter if nothing matches. - return GenericConverter.getInstance(); - } - - /** - * This method loops through the cells in a given row of the - * {@link FlowRunTable}. It looks at the tags of each cell to figure out how - * to process the contents. It then calculates the sum or min or max for each - * column or returns the cell as is. - * - * @param cells - * @param scannerContext - * @return true if next row is available for the scanner, false otherwise - * @throws IOException - */ - private boolean nextInternal(List cells, ScannerContext scannerContext) - throws IOException { - Cell cell = null; - startNext(); - // Loop through all the cells in this row - // For min/max/metrics we do need to scan the entire set of cells to get the - // right one - // But with flush/compaction, the number of cells being scanned will go down - // cells are grouped per column qualifier then sorted by cell timestamp - // (latest to oldest) per column qualifier - // So all cells in one qualifier come one after the other before we see the - // next column qualifier - ByteArrayComparator comp = new ByteArrayComparator(); - byte[] previousColumnQualifier = Separator.EMPTY_BYTES; - AggregationOperation currentAggOp = null; - SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); - Set alreadySeenAggDim = new HashSet<>(); - int addedCnt = 0; - long currentTimestamp = System.currentTimeMillis(); - ValueConverter converter = null; - int limit = batchSize; - - while (limit <= 0 || addedCnt < limit) { - cell = peekAtNextCell(scannerContext); - if (cell == null) { - break; - } - byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell); - if (previousColumnQualifier == null) { - // first time in loop - previousColumnQualifier = currentColumnQualifier; - } - - converter = getValueConverter(currentColumnQualifier); - if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - converter, currentTimestamp); - resetState(currentColumnCells, alreadySeenAggDim); - previousColumnQualifier = currentColumnQualifier; - currentAggOp = getCurrentAggOp(cell); - converter = getValueConverter(currentColumnQualifier); - } - collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, - converter, scannerContext); - nextCell(scannerContext); - } - if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter, - currentTimestamp); - if (LOG.isDebugEnabled()) { - if (addedCnt > 0) { - LOG.debug("emitted cells. " + addedCnt + " for " + this.action - + " rowKey=" - + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0)))); - } else { - LOG.debug("emitted no cells for " + this.action); - } - } - } - return hasMore(); - } - - private AggregationOperation getCurrentAggOp(Cell cell) { - List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); - // We assume that all the operations for a particular column are the same - return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags); - } - - /** - * resets the parameters to an initialized state for next loop iteration. - */ - private void resetState(SortedSet currentColumnCells, - Set alreadySeenAggDim) { - currentColumnCells.clear(); - alreadySeenAggDim.clear(); - } - - private void collectCells(SortedSet currentColumnCells, - AggregationOperation currentAggOp, Cell cell, - Set alreadySeenAggDim, ValueConverter converter, - ScannerContext scannerContext) throws IOException { - - if (currentAggOp == null) { - // not a min/max/metric cell, so just return it as is - currentColumnCells.add(cell); - return; - } - - switch (currentAggOp) { - case GLOBAL_MIN: - if (currentColumnCells.size() == 0) { - currentColumnCells.add(cell); - } else { - Cell currentMinCell = currentColumnCells.first(); - Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, - (NumericValueConverter) converter); - if (!currentMinCell.equals(newMinCell)) { - currentColumnCells.remove(currentMinCell); - currentColumnCells.add(newMinCell); - } - } - break; - case GLOBAL_MAX: - if (currentColumnCells.size() == 0) { - currentColumnCells.add(cell); - } else { - Cell currentMaxCell = currentColumnCells.first(); - Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, - (NumericValueConverter) converter); - if (!currentMaxCell.equals(newMaxCell)) { - currentColumnCells.remove(currentMaxCell); - currentColumnCells.add(newMaxCell); - } - } - break; - case SUM: - case SUM_FINAL: - if (LOG.isTraceEnabled()) { - LOG.trace("In collect cells " - + " FlowSannerOperation=" - + this.action - + " currentAggOp=" - + currentAggOp - + " cell qualifier=" - + Bytes.toString(CellUtil.cloneQualifier(cell)) - + " cell value= " - + converter.decodeValue(CellUtil.cloneValue(cell)) - + " timestamp=" + cell.getTimestamp()); - } - - // only if this app has not been seen yet, add to current column cells - List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); - String aggDim = HBaseTimelineServerUtils - .getAggregationCompactionDimension(tags); - if (!alreadySeenAggDim.contains(aggDim)) { - // if this agg dimension has already been seen, - // since they show up in sorted order - // we drop the rest which are older - // in other words, this cell is older than previously seen cells - // for that agg dim - // but when this agg dim is not seen, - // consider this cell in our working set - currentColumnCells.add(cell); - alreadySeenAggDim.add(aggDim); - } - break; - default: - break; - } // end of switch case - } - - /* - * Processes the cells in input param currentColumnCells and populates - * List cells as the output based on the input AggregationOperation - * parameter. - */ - private int emitCells(List cells, SortedSet currentColumnCells, - AggregationOperation currentAggOp, ValueConverter converter, - long currentTimestamp) throws IOException { - if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { - return 0; - } - if (currentAggOp == null) { - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("In emitCells " + this.action + " currentColumnCells size= " - + currentColumnCells.size() + " currentAggOp" + currentAggOp); - } - - switch (currentAggOp) { - case GLOBAL_MIN: - case GLOBAL_MAX: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - case SUM: - case SUM_FINAL: - switch (action) { - case FLUSH: - case MINOR_COMPACTION: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - case READ: - Cell sumCell = processSummation(currentColumnCells, - (NumericValueConverter) converter); - cells.add(sumCell); - return 1; - case MAJOR_COMPACTION: - List finalCells = processSummationMajorCompaction( - currentColumnCells, (NumericValueConverter) converter, - currentTimestamp); - cells.addAll(finalCells); - return finalCells.size(); - default: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - } - default: - cells.addAll(currentColumnCells); - return currentColumnCells.size(); - } - } - - /* - * Returns a cell whose value is the sum of all cell values in the input set. - * The new cell created has the timestamp of the most recent metric cell. The - * sum of a metric for a flow run is the summation at the point of the last - * metric update in that flow till that time. - */ - private Cell processSummation(SortedSet currentColumnCells, - NumericValueConverter converter) throws IOException { - Number sum = 0; - Number currentValue = 0; - long ts = 0L; - long mostCurrentTimestamp = 0L; - Cell mostRecentCell = null; - for (Cell cell : currentColumnCells) { - currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell)); - ts = cell.getTimestamp(); - if (mostCurrentTimestamp < ts) { - mostCurrentTimestamp = ts; - mostRecentCell = cell; - } - sum = converter.add(sum, currentValue); - } - byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = - HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes); - return sumCell; - } - - - /** - * Returns a list of cells that contains - * - * A) the latest cells for applications that haven't finished yet - * B) summation - * for the flow, based on applications that have completed and are older than - * a certain time - * - * The new cell created has the timestamp of the most recent metric cell. The - * sum of a metric for a flow run is the summation at the point of the last - * metric update in that flow till that time. - */ - @VisibleForTesting - List processSummationMajorCompaction( - SortedSet currentColumnCells, NumericValueConverter converter, - long currentTimestamp) - throws IOException { - Number sum = 0; - Number currentValue = 0; - long ts = 0L; - boolean summationDone = false; - List finalCells = new ArrayList(); - if (currentColumnCells == null) { - return finalCells; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("In processSummationMajorCompaction," - + " will drop cells older than " + currentTimestamp - + " CurrentColumnCells size=" + currentColumnCells.size()); - } - - for (Cell cell : currentColumnCells) { - AggregationOperation cellAggOp = getCurrentAggOp(cell); - // if this is the existing flow sum cell - List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); - String appId = HBaseTimelineServerUtils - .getAggregationCompactionDimension(tags); - if (appId == FLOW_APP_ID) { - sum = converter.add(sum, currentValue); - summationDone = true; - if (LOG.isTraceEnabled()) { - LOG.trace("reading flow app id sum=" + sum); - } - } else { - currentValue = (Number) converter.decodeValue(CellUtil - .cloneValue(cell)); - // read the timestamp truncated by the generator - ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); - if ((cellAggOp == AggregationOperation.SUM_FINAL) - && ((ts + this.appFinalValueRetentionThreshold) - < currentTimestamp)) { - sum = converter.add(sum, currentValue); - summationDone = true; - if (LOG.isTraceEnabled()) { - LOG.trace("MAJOR COMPACTION loop sum= " + sum - + " discarding now: " + " qualifier=" - + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" - + converter.decodeValue(CellUtil.cloneValue(cell)) - + " timestamp=" + cell.getTimestamp() + " " + this.action); - } - } else { - // not a final value but it's the latest cell for this app - // so include this cell in the list of cells to write back - finalCells.add(cell); - } - } - } - if (summationDone) { - Cell anyCell = currentColumnCells.first(); - List tags = new ArrayList(); - Tag t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), - Bytes.toBytes(FLOW_APP_ID)); - tags.add(t); - t = HBaseTimelineServerUtils.createTag( - AggregationCompactionDimension.APPLICATION_ID.getTagType(), - Bytes.toBytes(FLOW_APP_ID)); - tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); - Cell sumCell = HBaseTimelineServerUtils.createNewCell( - CellUtil.cloneRow(anyCell), - CellUtil.cloneFamily(anyCell), - CellUtil.cloneQualifier(anyCell), - TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(), FLOW_APP_ID), - converter.encodeValue(sum), tagByteArray); - finalCells.add(sumCell); - if (LOG.isTraceEnabled()) { - LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " - + Bytes.toString(CellUtil.cloneQualifier(sumCell)) - + " " + this.action); - } - LOG.info("After major compaction for qualifier=" - + Bytes.toString(CellUtil.cloneQualifier(sumCell)) - + " with currentColumnCells.size=" - + currentColumnCells.size() - + " returning finalCells.size=" + finalCells.size() - + " with sum=" + sum.longValue() - + " with cell timestamp " + sumCell.getTimestamp()); - } else { - String qualifier = ""; - LOG.info("After major compaction for qualifier=" + qualifier - + " with currentColumnCells.size=" - + currentColumnCells.size() - + " returning finalCells.size=" + finalCells.size() - + " with zero sum=" - + sum.longValue()); - } - return finalCells; - } - - /** - * Determines which cell is to be returned based on the values in each cell - * and the comparison operation MIN or MAX. - * - * @param previouslyChosenCell - * @param currentCell - * @param currentAggOp - * @return the cell which is the min (or max) cell - * @throws IOException - */ - private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, - AggregationOperation currentAggOp, NumericValueConverter converter) - throws IOException { - if (previouslyChosenCell == null) { - return currentCell; - } - try { - Number previouslyChosenCellValue = (Number)converter.decodeValue( - CellUtil.cloneValue(previouslyChosenCell)); - Number currentCellValue = (Number) converter.decodeValue(CellUtil - .cloneValue(currentCell)); - switch (currentAggOp) { - case GLOBAL_MIN: - if (converter.compare( - currentCellValue, previouslyChosenCellValue) < 0) { - // new value is minimum, hence return this cell - return currentCell; - } else { - // previously chosen value is miniumum, hence return previous min cell - return previouslyChosenCell; - } - case GLOBAL_MAX: - if (converter.compare( - currentCellValue, previouslyChosenCellValue) > 0) { - // new value is max, hence return this cell - return currentCell; - } else { - // previously chosen value is max, hence return previous max cell - return previouslyChosenCell; - } - default: - return currentCell; - } - } catch (IllegalArgumentException iae) { - LOG.error("caught iae during conversion to long ", iae); - return currentCell; - } - } - - @Override - public void close() throws IOException { - if (flowRunScanner != null) { - flowRunScanner.close(); - } else { - LOG.warn("scanner close called but scanner is null"); - } - } - - /** - * Called to signal the start of the next() call by the scanner. - */ - public void startNext() { - currentRow = null; - } - - /** - * Returns whether or not the underlying scanner has more rows. - */ - public boolean hasMore() { - return currentIndex < availableCells.size() ? true : hasMore; - } - - /** - * Returns the next available cell for the current row and advances the - * pointer to the next cell. This method can be called multiple times in a row - * to advance through all the available cells. - * - * @param scannerContext - * context information for the batch of cells under consideration - * @return the next available cell or null if no more cells are available for - * the current row - * @throws IOException - */ - public Cell nextCell(ScannerContext scannerContext) throws IOException { - Cell cell = peekAtNextCell(scannerContext); - if (cell != null) { - currentIndex++; - } - return cell; - } - - /** - * Returns the next available cell for the current row, without advancing the - * pointer. Calling this method multiple times in a row will continue to - * return the same cell. - * - * @param scannerContext - * context information for the batch of cells under consideration - * @return the next available cell or null if no more cells are available for - * the current row - * @throws IOException if any problem is encountered while grabbing the next - * cell. - */ - public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { - if (currentIndex >= availableCells.size()) { - // done with current batch - availableCells.clear(); - currentIndex = 0; - hasMore = flowRunScanner.next(availableCells, scannerContext); - } - Cell cell = null; - if (currentIndex < availableCells.size()) { - cell = availableCells.get(currentIndex); - if (currentRow == null) { - currentRow = CellUtil.cloneRow(cell); - } else if (!CellUtil.matchingRow(cell, currentRow)) { - // moved on to the next row - // don't use the current cell - // also signal no more cells for this row - return null; - } - } - return cell; - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() - */ - @Override - public long getMaxResultSize() { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.isFilterDone() called when the flow " - + "scanner's scanner is not a RegionScanner"); - } - return regionScanner.getMaxResultSize(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() - */ - @Override - public long getMvccReadPoint() { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.isFilterDone() called when the flow " - + "scanner's internal scanner is not a RegionScanner"); - } - return regionScanner.getMvccReadPoint(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() - */ - @Override - public boolean isFilterDone() throws IOException { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.isFilterDone() called when the flow " - + "scanner's internal scanner is not a RegionScanner"); - } - return regionScanner.isFilterDone(); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) - */ - @Override - public boolean reseek(byte[] bytes) throws IOException { - if (regionScanner == null) { - throw new IllegalStateException( - "RegionScanner.reseek() called when the flow " - + "scanner's internal scanner is not a RegionScanner"); - } - return regionScanner.reseek(bytes); - } - - @Override - public int getBatch() { - return batchSize; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java deleted file mode 100644 index 73c666fa9a..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - - -/** - * Identifies the scanner operation on the {@link FlowRunTable}. - */ -public enum FlowScannerOperation { - - /** - * If the scanner is opened for reading - * during preGet or preScan. - */ - READ, - - /** - * If the scanner is opened during preFlush. - */ - FLUSH, - - /** - * If the scanner is opened during minor Compaction. - */ - MINOR_COMPACTION, - - /** - * If the scanner is opened during major Compaction. - */ - MAJOR_COMPACTION -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java deleted file mode 100644 index 04963f3f1d..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow - * contains classes related to implementation for flow related tables, viz. flow - * run table and flow activity table. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java deleted file mode 100644 index e78db2a1ef..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage contains - * classes which define and implement reading and writing to backend storage. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml index 3ca443f81b..d06907d5d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml @@ -24,36 +24,138 @@ org.apache.hadoop 3.2.0-SNAPSHOT + 4.0.0 - hadoop-yarn-server-timelineservice-hbase-server + Apache Hadoop YARN TimelineService HBase Server 3.2.0-SNAPSHOT - Apache Hadoop YARN TimelineService HBase Servers - pom - - - hbase1 - - - !hbase.profile - - - - hadoop-yarn-server-timelineservice-hbase-server-1 - - - - hbase2 - - - hbase.profile - 2.0 - - - - hadoop-yarn-server-timelineservice-hbase-server-2 - - - + + + ${project.parent.parent.parent.basedir} + + + + + commons-logging + commons-logging + + + + com.google.guava + guava + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-yarn-api + provided + + + + org.apache.hadoop + hadoop-yarn-server-timelineservice-hbase-common + + + + org.apache.hbase + hbase-common + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.mortbay.jetty + jetty-util + + + + + + org.apache.hbase + hbase-client + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + + + + org.apache.hbase + hbase-server + provided + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-hdfs-client + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.mortbay.jetty + jetty-sslengine + + + + + + + + + maven-assembly-plugin + + src/assembly/coprocessor.xml + true + + + + create-coprocessor-jar + prepare-package + + single + + + + + + maven-jar-plugin + + + + test-jar + + + + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/assembly/coprocessor.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/assembly/coprocessor.xml rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml index faadc1d647..01ff0dd063 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/assembly/coprocessor.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml @@ -16,8 +16,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd"> coprocessor jar @@ -31,7 +30,7 @@ runtime org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common - org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-2 + org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java similarity index 60% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java index 3a9e2596c2..5c07670a62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java @@ -20,10 +20,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; @@ -31,7 +28,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; /** * A utility class used by hbase-server module. @@ -53,7 +49,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) { AggregationOperation aggOp = AggregationOperation .getAggregationOperation(attribute.getKey()); if (aggOp != null) { - Tag t = createTag(aggOp.getTagType(), attribute.getValue()); + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); return t; } @@ -61,7 +57,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) { AggregationCompactionDimension.getAggregationCompactionDimension( attribute.getKey()); if (aggCompactDim != null) { - Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue()); + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); return t; } return null; @@ -100,45 +96,6 @@ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, newValue, tags); } - /** - * Create a Tag. - * @param tagType tag type - * @param tag the content of the tag in byte array. - * @return an instance of Tag - */ - public static Tag createTag(byte tagType, byte[] tag) { - return new Tag(tagType, tag); - } - - /** - * Create a Tag. - * @param tagType tag type - * @param tag the content of the tag in String. - * @return an instance of Tag - */ - public static Tag createTag(byte tagType, String tag) { - return createTag(tagType, Bytes.toBytes(tag)); - } - - /** - * Convert a cell to a list of tags. - * @param cell the cell to convert - * @return a list of tags - */ - public static List convertCellAsTagList(Cell cell) { - return Tag.asList( - cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - } - - /** - * Convert a list of tags to a byte array. - * @param tags the list of tags to convert - * @return byte array representation of the list of tags - */ - public static byte[] convertTagListToByteArray(List tags) { - return Tag.fromList(tags); - } - /** * returns app id from the list of tags. * @@ -175,48 +132,4 @@ public static AggregationOperation getAggregationOperationFromTagsList( } return null; } - - // flush and compact all the regions of the primary table - - /** - * Flush and compact all regions of a table. - * @param server region server - * @param table the table to flush and compact - * @return the number of regions flushed and compacted - */ - public static int flushCompactTableRegions(HRegionServer server, - TableName table) throws IOException { - List regions = server.getOnlineRegions(table); - for (Region region : regions) { - region.flush(true); - region.compact(true); - } - return regions.size(); - } - - /** - * Check the existence of FlowRunCoprocessor in a table. - * @param server region server - * @param table table to check - * @param existenceExpected true if the FlowRunCoprocessor is expected - * to be loaded in the table, false otherwise - * @throws Exception - */ - public static void validateFlowRunCoprocessor(HRegionServer server, - TableName table, boolean existenceExpected) throws Exception { - List regions = server.getOnlineRegions(table); - for (Region region : regions) { - boolean found = false; - Set coprocs = region.getCoprocessorHost().getCoprocessors(); - for (String coprocName : coprocs) { - if (coprocName.contains("FlowRunCoprocessor")) { - found = true; - } - } - if (found != existenceExpected) { - throw new Exception("FlowRunCoprocessor is" + - (existenceExpected ? " not " : " ") + "loaded in table " + table); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index c526f58517..41a371b522 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -102,8 +102,7 @@ public void prePut(ObserverContext e, Put put, tags.add(t); } } - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); NavigableMap> newFamilyMap = new TreeMap<>( Bytes.BYTES_COMPARATOR); for (Map.Entry> entry : put.getFamilyCellMap() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 31122cae65..7f09e518ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -247,7 +247,8 @@ private boolean nextInternal(List cells, ScannerContext scannerContext) } private AggregationOperation getCurrentAggOp(Cell cell) { - List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); // We assume that all the operations for a particular column are the same return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags); } @@ -315,7 +316,8 @@ private void collectCells(SortedSet currentColumnCells, } // only if this app has not been seen yet, add to current column cells - List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); String aggDim = HBaseTimelineServerUtils .getAggregationCompactionDimension(tags); if (!alreadySeenAggDim.contains(aggDim)) { @@ -452,7 +454,8 @@ List processSummationMajorCompaction( for (Cell cell : currentColumnCells) { AggregationOperation cellAggOp = getCurrentAggOp(cell); // if this is the existing flow sum cell - List tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); String appId = HBaseTimelineServerUtils .getAggregationCompactionDimension(tags); if (appId == FLOW_APP_ID) { @@ -488,16 +491,13 @@ List processSummationMajorCompaction( if (summationDone) { Cell anyCell = currentColumnCells.first(); List tags = new ArrayList(); - Tag t = HBaseTimelineServerUtils.createTag( - AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), Bytes.toBytes(FLOW_APP_ID)); tags.add(t); - t = HBaseTimelineServerUtils.createTag( - AggregationCompactionDimension.APPLICATION_ID.getTagType(), + t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), Bytes.toBytes(FLOW_APP_ID)); tags.add(t); - byte[] tagByteArray = - HBaseTimelineServerUtils.convertTagListToByteArray(tags); + byte[] tagByteArray = Tag.fromList(tags); Cell sumCell = HBaseTimelineServerUtils.createNewCell( CellUtil.cloneRow(anyCell), CellUtil.cloneFamily(anyCell), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java