diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 17c01b54fb..63a75d30a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; @@ -78,7 +78,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { private static HBaseTestingUtility util; private static long ts = System.currentTimeMillis(); private static long dayTs = - TimelineStorageUtils.getTopOfTheDayTimestamp(ts); + HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); @BeforeClass public static void setup() throws Exception { @@ -984,7 +984,7 @@ public void testGetFlows() throws Exception { assertEquals(1, entities.size()); long firstFlowActivity = - TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); + HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 3ddb230f30..2778f50df7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -172,7 +172,7 @@ public void testWriteFlowRunMinMax() throws Exception { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); + Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -303,7 +303,8 @@ private void checkFlowActivityTable(String cluster, String user, String flow, assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp( + appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -388,7 +389,7 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { assertEquals(user, flowActivity.getUser()); assertEquals(flow, flowActivity.getFlowName()); long dayTs = - TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivity.getDate().getTime()); Set flowRuns = flowActivity.getFlowRuns(); assertEquals(3, flowRuns.size()); @@ -443,7 +444,8 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user, assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp( + appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); Map values = result diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 377611fcf9..7f46a5a0fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -109,8 +109,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), - hbaseConf)); + assertTrue(HBaseTimelineStorageUtils.isFlowRunTable( + region.getRegionInfo(), hbaseConf)); } } @@ -124,8 +124,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), - hbaseConf)); + assertFalse(HBaseTimelineStorageUtils.isFlowRunTable( + region.getRegionInfo(), hbaseConf)); } } @@ -139,8 +139,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), - hbaseConf)); + assertFalse(HBaseTimelineStorageUtils.isFlowRunTable( + region.getRegionInfo(), hbaseConf)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 756d57bd32..eb18e28243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -54,8 +54,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.junit.AfterClass; import org.junit.Assert; @@ -417,8 +417,8 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); tags = new ArrayList<>(); @@ -427,8 +427,8 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); tagByteArray = Tag.fromList(tags); // create a cell with a recent timestamp and attribute SUM_FINAL - Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); + Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); tags = new ArrayList<>(); @@ -437,8 +437,8 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM - Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); + Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); currentColumnCells.add(c3); tags = new ArrayList<>(); @@ -447,8 +447,8 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM - Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); + Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); currentColumnCells.add(c4); List cells = @@ -517,7 +517,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); currentColumnCells.add(c1); cellTsFinal++; @@ -531,7 +531,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with attribute SUM - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); currentColumnCells.add(c1); cellTsNotFinal++; @@ -608,7 +608,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); currentColumnCells.add(c1); cellTsFinal++; @@ -622,7 +622,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); currentColumnCells.add(c1); cellTsFinalNotExpire++; @@ -636,7 +636,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with attribute SUM - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); currentColumnCells.add(c1); cellTsNotFinal++; @@ -693,8 +693,8 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp and attribute SUM_FINAL - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - 120L, Bytes.toBytes(cellValue1), tagByteArray); + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, 120L, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); tags = new ArrayList<>(); @@ -704,8 +704,8 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM - Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - 130L, Bytes.toBytes(cellValue2), tagByteArray); + Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, 130L, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); List cells = fs.processSummationMajorCompaction(currentColumnCells, new LongConverter(), currentTimestamp); @@ -751,8 +751,8 @@ public void testProcessSummationOneCellSumFinal() throws IOException { SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - 120L, Bytes.toBytes(1110L), tagByteArray); + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, 120L, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, @@ -789,8 +789,8 @@ public void testProcessSummationOneCell() throws IOException { SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - currentTimestamp, Bytes.toBytes(1110L), tagByteArray); + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, + aQualifier, currentTimestamp, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, new LongConverter(), currentTimestamp); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java index 4cb46e63a8..c165801306 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java @@ -54,7 +54,8 @@ public byte[] encode(String appIdStr) { byte[] clusterTs = Bytes.toBytes( LongConverter.invertLong(appId.getClusterTimestamp())); System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG); - byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId())); + byte[] seqId = Bytes.toBytes( + HBaseTimelineStorageUtils.invertInt(appId.getId())); System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT); return appIdBytes; } @@ -79,7 +80,7 @@ public String decode(byte[] appIdBytes) { } long clusterTs = LongConverter.invertLong( Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG)); - int seqId = TimelineStorageUtils.invertInt( + int seqId = HBaseTimelineStorageUtils.invertInt( Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT)); return ApplicationId.newInstance(clusterTs, seqId).toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java new file mode 100644 index 0000000000..e93b47053a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * A bunch of utility functions used in HBase TimelineService backend. + */ +public final class HBaseTimelineStorageUtils { + /** milliseconds in one day. */ + public static final long MILLIS_ONE_DAY = 86400000L; + private static final Log LOG = + LogFactory.getLog(HBaseTimelineStorageUtils.class); + + private HBaseTimelineStorageUtils() { + } + + + /** + * Combines the input array of attributes and the input aggregation operation + * into a new array of attributes. + * + * @param attributes Attributes to be combined. + * @param aggOp Aggregation operation. + * @return array of combined attributes. + */ + public static Attribute[] combineAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int newLength = getNewLengthCombinedAttributes(attributes, aggOp); + Attribute[] combinedAttributes = new Attribute[newLength]; + + if (attributes != null) { + System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); + } + + if (aggOp != null) { + Attribute a2 = aggOp.getAttribute(); + combinedAttributes[newLength - 1] = a2; + } + return combinedAttributes; + } + + /** + * Returns a number for the new array size. The new array is the combination + * of input array of attributes and the input aggregation operation. + * + * @param attributes Attributes. + * @param aggOp Aggregation operation. + * @return the size for the new array + */ + private static int getNewLengthCombinedAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int oldLength = getAttributesLength(attributes); + int aggLength = getAppOpLength(aggOp); + return oldLength + aggLength; + } + + private static int getAppOpLength(AggregationOperation aggOp) { + if (aggOp != null) { + return 1; + } + return 0; + } + + private static int getAttributesLength(Attribute[] attributes) { + if (attributes != null) { + return attributes.length; + } + return 0; + } + + /** + * Returns the first seen aggregation operation as seen in the list of input + * tags or null otherwise. + * + * @param tags list of HBase tags. + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + /** + * Creates a {@link Tag} from the input attribute. + * + * @param attribute Attribute from which tag has to be fetched. + * @return a HBase Tag. + */ + public static Tag getTagFromAttribute(Map.Entry attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = + AggregationCompactionDimension.getAggregationCompactionDimension( + attribute.getKey()); + if (aggCompactDim != null) { + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + + /** + * creates a new cell based on the input cell but with the new value. + * + * @param origCell Original cell + * @param newValue new cell value + * @return cell + * @throws IOException while creating new cell. + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs. + * + * @param row row of the cell to be created + * @param family column family name of the new cell + * @param qualifier qualifier for the new cell + * @param ts timestamp of the new cell + * @param newValue value of the new cell + * @param tags tags in the new cell + * @return cell + * @throws IOException while creating the cell. + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * returns app id from the list of tags. + * + * @param tags cell tags to be looked into + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + return appId; + } + } + return appId; + } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME); + if (HBaseTimelineStorageUtils.LOG.isDebugEnabled()) { + HBaseTimelineStorageUtils.LOG.debug("regionTableName=" + regionTableName); + } + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (HBaseTimelineStorageUtils.LOG.isDebugEnabled()) { + HBaseTimelineStorageUtils.LOG.debug( + "table is the flow run table!! " + flowRunTableName); + } + return true; + } + return false; + } + + /** + * Converts an int into it's inverse int to be used in (row) keys + * where we want to have the largest int value in the top of the table + * (scans start at the largest int first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted int + */ + public static int invertInt(int key) { + return Integer.MAX_VALUE - key; + } + + /** + * returns the timestamp of that day's start (which is midnight 00:00:00 AM) + * for a given input timestamp. + * + * @param ts Timestamp. + * @return timestamp of that day's beginning (midnight) + */ + public static long getTopOfTheDayTimestamp(long ts) { + long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); + return dayTimestamp; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index aa9a793ef4..9b83659ca0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -20,22 +20,13 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @@ -47,10 +38,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** * A bunch of utility functions used across TimelineReader and TimelineWriter. @@ -63,133 +50,6 @@ private TimelineStorageUtils() { private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class); - /** milliseconds in one day. */ - public static final long MILLIS_ONE_DAY = 86400000L; - - /** - * Converts an int into it's inverse int to be used in (row) keys - * where we want to have the largest int value in the top of the table - * (scans start at the largest int first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted int - */ - public static int invertInt(int key) { - return Integer.MAX_VALUE - key; - } - - /** - * returns the timestamp of that day's start (which is midnight 00:00:00 AM) - * for a given input timestamp. - * - * @param ts Timestamp. - * @return timestamp of that day's beginning (midnight) - */ - public static long getTopOfTheDayTimestamp(long ts) { - long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); - return dayTimestamp; - } - - /** - * Combines the input array of attributes and the input aggregation operation - * into a new array of attributes. - * - * @param attributes Attributes to be combined. - * @param aggOp Aggregation operation. - * @return array of combined attributes. - */ - public static Attribute[] combineAttributes(Attribute[] attributes, - AggregationOperation aggOp) { - int newLength = getNewLengthCombinedAttributes(attributes, aggOp); - Attribute[] combinedAttributes = new Attribute[newLength]; - - if (attributes != null) { - System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); - } - - if (aggOp != null) { - Attribute a2 = aggOp.getAttribute(); - combinedAttributes[newLength - 1] = a2; - } - return combinedAttributes; - } - - /** - * Returns a number for the new array size. The new array is the combination - * of input array of attributes and the input aggregation operation. - * - * @param attributes Attributes. - * @param aggOp Aggregation operation. - * @return the size for the new array - */ - private static int getNewLengthCombinedAttributes(Attribute[] attributes, - AggregationOperation aggOp) { - int oldLength = getAttributesLength(attributes); - int aggLength = getAppOpLength(aggOp); - return oldLength + aggLength; - } - - private static int getAppOpLength(AggregationOperation aggOp) { - if (aggOp != null) { - return 1; - } - return 0; - } - - private static int getAttributesLength(Attribute[] attributes) { - if (attributes != null) { - return attributes.length; - } - return 0; - } - - /** - * Returns the first seen aggregation operation as seen in the list of input - * tags or null otherwise. - * - * @param tags list of HBase tags. - * @return AggregationOperation - */ - public static AggregationOperation getAggregationOperationFromTagsList( - List tags) { - for (AggregationOperation aggOp : AggregationOperation.values()) { - for (Tag tag : tags) { - if (tag.getType() == aggOp.getTagType()) { - return aggOp; - } - } - } - return null; - } - - /** - * Creates a {@link Tag} from the input attribute. - * - * @param attribute Attribute from which tag has to be fetched. - * @return a HBase Tag. - */ - public static Tag getTagFromAttribute(Entry attribute) { - // attribute could be either an Aggregation Operation or - // an Aggregation Dimension - // Get the Tag type from either - AggregationOperation aggOp = AggregationOperation - .getAggregationOperation(attribute.getKey()); - if (aggOp != null) { - Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); - return t; - } - - AggregationCompactionDimension aggCompactDim = - AggregationCompactionDimension.getAggregationCompactionDimension( - attribute.getKey()); - if (aggCompactDim != null) { - Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); - return t; - } - return null; - } - /** * Matches key-values filter. Used for relatesTo/isRelatedTo filters. * @@ -516,71 +376,4 @@ public static boolean isIntegralValue(Object obj) { (obj instanceof Long); } - /** - * creates a new cell based on the input cell but with the new value. - * - * @param origCell Original cell - * @param newValue new cell value - * @return cell - * @throws IOException while creating new cell. - */ - public static Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - - /** - * creates a cell with the given inputs. - * - * @param row row of the cell to be created - * @param family column family name of the new cell - * @param qualifier qualifier for the new cell - * @param ts timestamp of the new cell - * @param newValue value of the new cell - * @param tags tags in the new cell - * @return cell - * @throws IOException while creating the cell. - */ - public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, - long ts, byte[] newValue, byte[] tags) throws IOException { - return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, - newValue, tags); - } - - /** - * returns app id from the list of tags. - * - * @param tags cell tags to be looked into - * @return App Id as the AggregationCompactionDimension - */ - public static String getAggregationCompactionDimension(List tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(t.getValue()); - return appId; - } - } - return appId; - } - - public static boolean isFlowRunTable(HRegionInfo hRegionInfo, - Configuration conf) { - String regionTableName = hRegionInfo.getTable().getNameAsString(); - String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, - FlowRunTable.DEFAULT_TABLE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("regionTableName=" + regionTableName); - } - if (flowRunTableName.equalsIgnoreCase(regionTableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" table is the flow run table!! " + flowRunTableName); - } - return true; - } - return false; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 71c3d907d1..439e0c8fee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -26,9 +26,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -144,8 +144,8 @@ public void store(byte[] rowKey, } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, this.aggOp); + Attribute[] combinedAttributes = + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -269,8 +269,8 @@ public void store(byte[] rowKey, } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, this.aggOp); + Attribute[] combinedAttributes = + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, null, inputValue, combinedAttributes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index d10608a49a..bb77e36d4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -18,10 +18,10 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the flow activity table. @@ -59,7 +59,7 @@ protected FlowActivityRowKey(String clusterId, Long timestamp, String userId, String flowName, boolean convertDayTsToTopOfDay) { this.clusterId = clusterId; if (convertDayTsToTopOfDay && (timestamp != null)) { - this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp); + this.dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(timestamp); } else { this.dayTs = timestamp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index 2e7a9d853e..90dd3456ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -113,8 +113,8 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, Object inputValue, Attribute... attributes) throws IOException { - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, aggOp); + Attribute[] combinedAttributes = + HBaseTimelineStorageUtils.combineAttributes(attributes, aggOp); column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, inputValue, combinedAttributes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index e74282a76b..278d18eb44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -26,10 +26,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -136,7 +136,7 @@ public void store(byte[] rowKey, byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = - TimelineStorageUtils.combineAttributes(attributes, this.aggOp); + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -163,7 +163,7 @@ public void store(byte[] rowKey, byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = - TimelineStorageUtils.combineAttributes(attributes, this.aggOp); + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index a9dcfaad24..2be6ef80a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; /** @@ -71,7 +71,7 @@ public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); - isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable( region.getRegionInfo(), env.getConfiguration()); } } @@ -107,7 +107,7 @@ public void prePut(ObserverContext e, Put put, List tags = new ArrayList<>(); if ((attributes != null) && (attributes.size() > 0)) { for (Map.Entry attribute : attributes.entrySet()) { - Tag t = TimelineStorageUtils.getTagFromAttribute(attribute); + Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute); tags.add(t); } byte[] tagByteArray = Tag.fromList(tags); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 6e67722617..0e3c8ee1cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -249,7 +249,7 @@ private AggregationOperation getCurrentAggOp(Cell cell) { List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); // We assume that all the operations for a particular column are the same - return TimelineStorageUtils.getAggregationOperationFromTagsList(tags); + return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags); } /** @@ -323,7 +323,7 @@ private void collectCells(SortedSet currentColumnCells, // only if this app has not been seen yet, add to current column cells List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String aggDim = TimelineStorageUtils + String aggDim = HBaseTimelineStorageUtils .getAggregationCompactionDimension(tags); if (!alreadySeenAggDim.contains(aggDim)) { // if this agg dimension has already been seen, @@ -418,7 +418,8 @@ private Cell processSummation(SortedSet currentColumnCells, sum = converter.add(sum, currentValue); } byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); + Cell sumCell = + HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); return sumCell; } @@ -460,7 +461,7 @@ List processSummationMajorCompaction( // if this is the existing flow sum cell List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String appId = TimelineStorageUtils + String appId = HBaseTimelineStorageUtils .getAggregationCompactionDimension(tags); if (appId == FLOW_APP_ID) { sum = converter.add(sum, currentValue); @@ -502,7 +503,7 @@ List processSummationMajorCompaction( Bytes.toBytes(FLOW_APP_ID)); tags.add(t); byte[] tagByteArray = Tag.fromList(tags); - Cell sumCell = TimelineStorageUtils.createNewCell( + Cell sumCell = HBaseTimelineStorageUtils.createNewCell( CellUtil.cloneRow(anyCell), CellUtil.cloneFamily(anyCell), CellUtil.cloneQualifier(anyCell), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 368b0604d5..5beb18941c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -185,7 +185,7 @@ public void testEntityRowKey() { @Test public void testFlowActivityRowKey() { Long ts = 1459900830000L; - Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts); + Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); byte[] byteRowKey = new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey(); FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);