From 1a227744ac0ceff178171fc4ddbf3d27275bdc4f Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Wed, 15 Jun 2016 11:43:36 -0700 Subject: [PATCH] YARN-5070. upgrade HBase version for first merge (Vrushali C via sjlee) --- hadoop-project/pom.xml | 4 +- .../storage/flow/TestHBaseStorageFlowRun.java | 168 +++++++++++++++++- .../TestHBaseStorageFlowRunCompaction.java | 159 ++++++++++++++++- .../storage/flow/FlowRunCoprocessor.java | 17 +- .../storage/flow/FlowScanner.java | 134 +++++++------- 5 files changed, 393 insertions(+), 89 deletions(-) diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9b305700b2..bb46de0bf4 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -49,8 +49,8 @@ 2.11.0 0.8.2.1 - 1.0.1 - 4.5.0-SNAPSHOT + 1.1.3 + 4.7.0-HBase-1.1 2.5.1 ${project.version} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 328b25a567..6c4c810758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -107,8 +107,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { // check in flow run table util.waitUntilAllRegionsAssigned(table); HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (HRegion region : regions) { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } @@ -122,8 +122,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { // check in flow activity table util.waitUntilAllRegionsAssigned(table); HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (HRegion region : regions) { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } @@ -137,8 +137,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { // check in entity run table util.waitUntilAllRegionsAssigned(table); HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (HRegion region : regions) { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } @@ -311,6 +311,9 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { // check flow run checkFlowRunTable(cluster, user, flow, runid, c1); + // check various batch limits in scanning the table for this flow + checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1); + // use the timeline reader to verify data HBaseTimelineReaderImpl hbr = null; try { @@ -350,6 +353,157 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { } } + /* + * checks the batch limits on a scan + */ + void checkFlowRunTableBatchLimit(String cluster, String user, + String flow, long runid, Configuration c1) throws IOException { + + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); + s.setStartRow(startRow); + // set a batch limit + int batchLimit = 2; + s.setBatch(batchLimit); + String clusterStop = cluster + "1"; + byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn + .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int loopCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertNotNull(values); + assertTrue(values.size() <= batchLimit); + loopCount++; + } + assertTrue(loopCount > 0); + + // test with a diff batch limit + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(startRow); + // set a batch limit + batchLimit = 1; + s.setBatch(batchLimit); + s.setMaxResultsPerColumnFamily(2); + s.setStopRow(stopRow); + scanner = table1.getScanner(s); + + loopCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertEquals(batchLimit, result.rawCells().length); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertNotNull(values); + assertEquals(batchLimit, values.size()); + loopCount++; + } + assertTrue(loopCount > 0); + + // test with a diff batch limit + // set it high enough + // we expect back 3 since there are + // column = m!HDFS_BYTES_READ value=57 + // column = m!MAP_SLOT_MILLIS value=141 + // column min_start_time value=1425016501000 + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(startRow); + // set a batch limit + batchLimit = 100; + s.setBatch(batchLimit); + s.setStopRow(stopRow); + scanner = table1.getScanner(s); + + loopCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertNotNull(values); + // assert that with every next invocation + // we get back <= batchLimit values + assertTrue(values.size() <= batchLimit); + assertTrue(values.size() == 3); // see comment above + loopCount++; + } + // should loop through only once + assertTrue(loopCount == 1); + + // set it to a negative number + // we expect all 3 back since there are + // column = m!HDFS_BYTES_READ value=57 + // column = m!MAP_SLOT_MILLIS value=141 + // column min_start_time value=1425016501000 + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(startRow); + // set a batch limit + batchLimit = -671; + s.setBatch(batchLimit); + s.setStopRow(stopRow); + scanner = table1.getScanner(s); + + loopCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertEquals(3, result.rawCells().length); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertNotNull(values); + // assert that with every next invocation + // we get back <= batchLimit values + assertEquals(3, values.size()); + loopCount++; + } + // should loop through only once + assertEquals(1, loopCount); + + // set it to 0 + // we expect all 3 back since there are + // column = m!HDFS_BYTES_READ value=57 + // column = m!MAP_SLOT_MILLIS value=141 + // column min_start_time value=1425016501000 + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(startRow); + // set a batch limit + batchLimit = 0; + s.setBatch(batchLimit); + s.setStopRow(stopRow); + scanner = table1.getScanner(s); + + loopCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertEquals(3, result.rawCells().length); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertNotNull(values); + // assert that with every next invocation + // we get back <= batchLimit values + assertEquals(3, values.size()); + loopCount++; + } + // should loop through only once + assertEquals(1, loopCount); + } + private void checkFlowRunTable(String cluster, String user, String flow, long runid, Configuration c1) throws IOException { Scan s = new Scan(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index e1bef535ea..71523b81d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; @@ -123,6 +123,153 @@ public void testWriteNonNumericData() throws Exception { assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); } + @Test + public void testWriteScanBatchLimit() throws Exception { + String rowKey = "nonNumericRowKey"; + String column = "nonNumericColumnName"; + String value = "nonNumericValue"; + String column2 = "nonNumericColumnName2"; + String value2 = "nonNumericValue2"; + String column3 = "nonNumericColumnName3"; + String value3 = "nonNumericValue3"; + String column4 = "nonNumericColumnName4"; + String value4 = "nonNumericValue4"; + + byte[] rowKeyBytes = Bytes.toBytes(rowKey); + byte[] columnNameBytes = Bytes.toBytes(column); + byte[] valueBytes = Bytes.toBytes(value); + byte[] columnName2Bytes = Bytes.toBytes(column2); + byte[] value2Bytes = Bytes.toBytes(value2); + byte[] columnName3Bytes = Bytes.toBytes(column3); + byte[] value3Bytes = Bytes.toBytes(value3); + byte[] columnName4Bytes = Bytes.toBytes(column4); + byte[] value4Bytes = Bytes.toBytes(value4); + + Put p = new Put(rowKeyBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, + value2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, + value3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, + value4Bytes); + + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Table flowRunTable = conn.getTable(table); + flowRunTable.put(p); + + String rowKey2 = "nonNumericRowKey2"; + byte[] rowKey2Bytes = Bytes.toBytes(rowKey2); + p = new Put(rowKey2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, + value2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, + value3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, + value4Bytes); + flowRunTable.put(p); + + String rowKey3 = "nonNumericRowKey3"; + byte[] rowKey3Bytes = Bytes.toBytes(rowKey3); + p = new Put(rowKey3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, + value2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, + value3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, + value4Bytes); + flowRunTable.put(p); + + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + int batchLimit = 2; + s.setBatch(batchLimit); + ResultScanner scanner = flowRunTable.getScanner(s); + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertTrue(values.size() <= batchLimit); + } + + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + batchLimit = 3; + s.setBatch(batchLimit); + scanner = flowRunTable.getScanner(s); + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertTrue(values.size() <= batchLimit); + } + + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + batchLimit = 1000; + s.setBatch(batchLimit); + scanner = flowRunTable.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertTrue(values.size() <= batchLimit); + // we expect all back in one next call + assertEquals(4, values.size()); + rowCount++; + } + // should get back 1 row with each invocation + // if scan batch is set sufficiently high + assertEquals(3, rowCount); + + // test with a negative number + // should have same effect as setting it to a high number + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + batchLimit = -2992; + s.setBatch(batchLimit); + scanner = flowRunTable.getScanner(s); + rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertEquals(4, result.rawCells().length); + Map values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + // we expect all back in one next call + assertEquals(4, values.size()); + System.out.println(" values size " + values.size() + " " + batchLimit ); + rowCount++; + } + // should get back 1 row with each invocation + // if scan batch is set sufficiently high + assertEquals(3, rowCount); + } + @Test public void testWriteFlowRunCompaction() throws Exception { String cluster = "kompaction_cluster1"; @@ -176,13 +323,13 @@ public void testWriteFlowRunCompaction() throws Exception { // check in flow run table HRegionServer server = util.getRSForFirstRegionInTable(TableName .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName + List regions = server.getOnlineRegions(TableName .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table - for (HRegion region : regions) { - region.flushcache(); - region.compactStores(true); + for (Region region : regions) { + region.flush(true); + region.compact(true); } // check flow run for one flow many apps @@ -237,7 +384,7 @@ private FlowScanner getFlowScannerForTestingCompaction() { request.setIsMajor(true, true); // okay to pass in nulls for the constructor arguments // because all we want to do is invoke the process summation - FlowScanner fs = new FlowScanner(null, -1, null, + FlowScanner fs = new FlowScanner(null, null, (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION : FlowScannerOperation.MINOR_COMPACTION)); assertNotNull(fs); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 8ea51a12bf..a9dcfaad24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; @@ -59,7 +59,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private boolean isFlowRunRegion = false; - private HRegion region; + private Region region; /** * generate a timestamp that is unique per row in a region this is per region. */ @@ -178,7 +178,7 @@ public void preGetOp(ObserverContext e, scan.setMaxVersions(); RegionScanner scanner = null; try { - scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(), + scanner = new FlowScanner(e.getEnvironment(), scan, region.getScanner(scan), FlowScannerOperation.READ); scanner.next(results); e.bypass(); @@ -233,7 +233,7 @@ public RegionScanner postScannerOpen( if (!isFlowRunRegion) { return scanner; } - return new FlowScanner(e.getEnvironment(), scan.getBatch(), + return new FlowScanner(e.getEnvironment(), scan, scanner, FlowScannerOperation.READ); } @@ -257,7 +257,7 @@ public InternalScanner preFlush( + " storeFilesCount=" + store.getStorefilesCount()); } } - return new FlowScanner(c.getEnvironment(), -1, scanner, + return new FlowScanner(c.getEnvironment(), scanner, FlowScannerOperation.FLUSH); } @@ -296,10 +296,9 @@ public InternalScanner preCompact( requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION : FlowScannerOperation.MINOR_COMPACTION); LOG.info("Compactionrequest= " + request.toString() + " " - + requestOp.toString() + " RegionName=" - + e.getEnvironment().getRegion().getRegionNameAsString()); + + requestOp.toString() + " RegionName=" + e.getEnvironment() + .getRegion().getRegionInfo().getRegionNameAsString()); } - - return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp); + return new FlowScanner(e.getEnvironment(), scanner, requestOp); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 648c77b91e..6e67722617 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -35,10 +35,12 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -68,9 +70,9 @@ class FlowScanner implements RegionScanner, Closeable { */ private static final String FLOW_APP_ID = "application_00000000000_0000"; - private final HRegion region; + private final Region region; private final InternalScanner flowRunScanner; - private final int limit; + private final int batchSize; private final long appFinalValueRetentionThreshold; private RegionScanner regionScanner; private boolean hasMore; @@ -79,9 +81,15 @@ class FlowScanner implements RegionScanner, Closeable { private int currentIndex; private FlowScannerOperation action = FlowScannerOperation.READ; - FlowScanner(RegionCoprocessorEnvironment env, int limit, + FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner, + FlowScannerOperation action) { + this(env, null, internalScanner, action); + } + + FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan, InternalScanner internalScanner, FlowScannerOperation action) { - this.limit = limit; + this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch(); + // TODO initialize other scan attributes like Scan#maxResultSize this.flowRunScanner = internalScanner; if (internalScanner instanceof RegionScanner) { this.regionScanner = (RegionScanner) internalScanner; @@ -98,8 +106,12 @@ class FlowScanner implements RegionScanner, Closeable { YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); } + if (LOG.isDebugEnabled()) { + LOG.debug(" batch size=" + batchSize); + } } + /* * (non-Javadoc) * @@ -112,22 +124,24 @@ public HRegionInfo getRegionInfo() { @Override public boolean nextRaw(List cells) throws IOException { - return nextRaw(cells, limit); + return nextRaw(cells, ScannerContext.newBuilder().build()); } @Override - public boolean nextRaw(List cells, int cellLimit) throws IOException { - return nextInternal(cells, cellLimit); + public boolean nextRaw(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); } @Override public boolean next(List cells) throws IOException { - return next(cells, limit); + return next(cells, ScannerContext.newBuilder().build()); } @Override - public boolean next(List cells, int cellLimit) throws IOException { - return nextInternal(cells, cellLimit); + public boolean next(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); } /** @@ -158,17 +172,6 @@ private static ValueConverter getValueConverter(byte[] colQualifierBytes) { return GenericConverter.getInstance(); } - /** - * Checks if the converter is a numeric converter or not. For a converter to - * be numeric, it must implement {@link NumericValueConverter} interface. - * @param converter - * @return true, if converter is of type NumericValueConverter, false - * otherwise. - */ - private static boolean isNumericConverter(ValueConverter converter) { - return (converter instanceof NumericValueConverter); - } - /** * This method loops through the cells in a given row of the * {@link FlowRunTable}. It looks at the tags of each cell to figure out how @@ -176,12 +179,11 @@ private static boolean isNumericConverter(ValueConverter converter) { * column or returns the cell as is. * * @param cells - * @param cellLimit + * @param scannerContext * @return true if next row is available for the scanner, false otherwise * @throws IOException */ - @SuppressWarnings("deprecation") - private boolean nextInternal(List cells, int cellLimit) + private boolean nextInternal(List cells, ScannerContext scannerContext) throws IOException { Cell cell = null; startNext(); @@ -194,48 +196,47 @@ private boolean nextInternal(List cells, int cellLimit) // So all cells in one qualifier come one after the other before we see the // next column qualifier ByteArrayComparator comp = new ByteArrayComparator(); - byte[] currentColumnQualifier = Separator.EMPTY_BYTES; + byte[] previousColumnQualifier = Separator.EMPTY_BYTES; AggregationOperation currentAggOp = null; SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set alreadySeenAggDim = new HashSet<>(); int addedCnt = 0; long currentTimestamp = System.currentTimeMillis(); ValueConverter converter = null; + int limit = batchSize; - while (cellLimit <= 0 || addedCnt < cellLimit) { - cell = peekAtNextCell(cellLimit); + while (limit <= 0 || addedCnt < limit) { + cell = peekAtNextCell(scannerContext); if (cell == null) { break; } - byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); - if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { - if (converter != null && isNumericConverter(converter)) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - converter, currentTimestamp); - } - resetState(currentColumnCells, alreadySeenAggDim); - currentColumnQualifier = newColumnQualifier; - currentAggOp = getCurrentAggOp(cell); - converter = getValueConverter(newColumnQualifier); + byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell); + if (previousColumnQualifier == null) { + // first time in loop + previousColumnQualifier = currentColumnQualifier; } - // No operation needs to be performed on non numeric converters. - if (!isNumericConverter(converter)) { - currentColumnCells.add(cell); - nextCell(cellLimit); - continue; + + converter = getValueConverter(currentColumnQualifier); + if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + converter, currentTimestamp); + resetState(currentColumnCells, alreadySeenAggDim); + previousColumnQualifier = currentColumnQualifier; + currentAggOp = getCurrentAggOp(cell); + converter = getValueConverter(currentColumnQualifier); } collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, - (NumericValueConverter)converter); - nextCell(cellLimit); + converter, scannerContext); + nextCell(scannerContext); } - if (!currentColumnCells.isEmpty()) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - converter, currentTimestamp); + if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter, + currentTimestamp); if (LOG.isDebugEnabled()) { if (addedCnt > 0) { LOG.debug("emitted cells. " + addedCnt + " for " + this.action + " rowKey=" - + FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString()); + + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0)))); } else { LOG.debug("emitted no cells for " + this.action); } @@ -252,7 +253,7 @@ private AggregationOperation getCurrentAggOp(Cell cell) { } /** - * resets the parameters to an intialized state for next loop iteration. + * resets the parameters to an initialized state for next loop iteration. * * @param cell * @param currentAggOp @@ -268,12 +269,12 @@ private void resetState(SortedSet currentColumnCells, private void collectCells(SortedSet currentColumnCells, AggregationOperation currentAggOp, Cell cell, - Set alreadySeenAggDim, NumericValueConverter converter) - throws IOException { + Set alreadySeenAggDim, ValueConverter converter, + ScannerContext scannerContext) throws IOException { + if (currentAggOp == null) { // not a min/max/metric cell, so just return it as is currentColumnCells.add(cell); - nextCell(limit); return; } @@ -284,7 +285,7 @@ private void collectCells(SortedSet currentColumnCells, } else { Cell currentMinCell = currentColumnCells.first(); Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, - converter); + (NumericValueConverter) converter); if (!currentMinCell.equals(newMinCell)) { currentColumnCells.remove(currentMinCell); currentColumnCells.add(newMinCell); @@ -297,7 +298,7 @@ private void collectCells(SortedSet currentColumnCells, } else { Cell currentMaxCell = currentColumnCells.first(); Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, - converter); + (NumericValueConverter) converter); if (!currentMaxCell.equals(newMaxCell)) { currentColumnCells.remove(currentMaxCell); currentColumnCells.add(newMaxCell); @@ -610,15 +611,14 @@ public boolean hasMore() { * pointer to the next cell. This method can be called multiple times in a row * to advance through all the available cells. * - * @param cellLimit - * the limit of number of cells to return if the next batch must be - * fetched by the wrapped scanner + * @param scannerContext + * context information for the batch of cells under consideration * @return the next available cell or null if no more cells are available for * the current row * @throws IOException */ - public Cell nextCell(int cellLimit) throws IOException { - Cell cell = peekAtNextCell(cellLimit); + public Cell nextCell(ScannerContext scannerContext) throws IOException { + Cell cell = peekAtNextCell(scannerContext); if (cell != null) { currentIndex++; } @@ -630,20 +630,19 @@ public Cell nextCell(int cellLimit) throws IOException { * pointer. Calling this method multiple times in a row will continue to * return the same cell. * - * @param cellLimit - * the limit of number of cells to return if the next batch must be - * fetched by the wrapped scanner + * @param scannerContext + * context information for the batch of cells under consideration * @return the next available cell or null if no more cells are available for * the current row * @throws IOException if any problem is encountered while grabbing the next * cell. */ - public Cell peekAtNextCell(int cellLimit) throws IOException { + public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { if (currentIndex >= availableCells.size()) { // done with current batch availableCells.clear(); currentIndex = 0; - hasMore = flowRunScanner.next(availableCells, cellLimit); + hasMore = flowRunScanner.next(availableCells, scannerContext); } Cell cell = null; if (currentIndex < availableCells.size()) { @@ -720,4 +719,9 @@ public boolean reseek(byte[] bytes) throws IOException { } return regionScanner.reseek(bytes); } + + @Override + public int getBatch() { + return batchSize; + } }