YARN-5070. upgrade HBase version for first merge (Vrushali C via sjlee)

This commit is contained in:
Sangjin Lee 2016-06-15 11:43:36 -07:00
parent 1ff6833bba
commit 1a227744ac
5 changed files with 393 additions and 89 deletions

View File

@ -49,8 +49,8 @@
<xerces.jdiff.version>2.11.0</xerces.jdiff.version> <xerces.jdiff.version>2.11.0</xerces.jdiff.version>
<kafka.version>0.8.2.1</kafka.version> <kafka.version>0.8.2.1</kafka.version>
<hbase.version>1.0.1</hbase.version> <hbase.version>1.1.3</hbase.version>
<phoenix.version>4.5.0-SNAPSHOT</phoenix.version> <phoenix.version>4.7.0-HBase-1.1</phoenix.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version> <hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>

View File

@ -41,7 +41,7 @@
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@ -107,8 +107,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in flow run table // check in flow run table
util.waitUntilAllRegionsAssigned(table); util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (HRegion region : regions) { for (Region region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf)); hbaseConf));
} }
@ -122,8 +122,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in flow activity table // check in flow activity table
util.waitUntilAllRegionsAssigned(table); util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (HRegion region : regions) { for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf)); hbaseConf));
} }
@ -137,8 +137,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in entity run table // check in entity run table
util.waitUntilAllRegionsAssigned(table); util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (HRegion region : regions) { for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf)); hbaseConf));
} }
@ -311,6 +311,9 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
// check flow run // check flow run
checkFlowRunTable(cluster, user, flow, runid, c1); checkFlowRunTable(cluster, user, flow, runid, c1);
// check various batch limits in scanning the table for this flow
checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1);
// use the timeline reader to verify data // use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null; HBaseTimelineReaderImpl hbr = null;
try { try {
@ -350,6 +353,157 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
} }
} }
/*
* checks the batch limits on a scan
*/
void checkFlowRunTableBatchLimit(String cluster, String user,
String flow, long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
s.setStartRow(startRow);
// set a batch limit
int batchLimit = 2;
s.setBatch(batchLimit);
String clusterStop = cluster + "1";
byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn
.getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
assertTrue(values.size() <= batchLimit);
loopCount++;
}
assertTrue(loopCount > 0);
// test with a diff batch limit
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 1;
s.setBatch(batchLimit);
s.setMaxResultsPerColumnFamily(2);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(batchLimit, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
assertEquals(batchLimit, values.size());
loopCount++;
}
assertTrue(loopCount > 0);
// test with a diff batch limit
// set it high enough
// we expect back 3 since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 100;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertTrue(values.size() <= batchLimit);
assertTrue(values.size() == 3); // see comment above
loopCount++;
}
// should loop through only once
assertTrue(loopCount == 1);
// set it to a negative number
// we expect all 3 back since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = -671;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(3, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertEquals(3, values.size());
loopCount++;
}
// should loop through only once
assertEquals(1, loopCount);
// set it to 0
// we expect all 3 back since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 0;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);
loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(3, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertEquals(3, values.size());
loopCount++;
}
// should loop through only once
assertEquals(1, loopCount);
}
private void checkFlowRunTable(String cluster, String user, String flow, private void checkFlowRunTable(String cluster, String user, String flow,
long runid, Configuration c1) throws IOException { long runid, Configuration c1) throws IOException {
Scan s = new Scan(); Scan s = new Scan();

View File

@ -45,7 +45,7 @@
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -123,6 +123,153 @@ public void testWriteNonNumericData() throws Exception {
assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value);
} }
@Test
public void testWriteScanBatchLimit() throws Exception {
String rowKey = "nonNumericRowKey";
String column = "nonNumericColumnName";
String value = "nonNumericValue";
String column2 = "nonNumericColumnName2";
String value2 = "nonNumericValue2";
String column3 = "nonNumericColumnName3";
String value3 = "nonNumericValue3";
String column4 = "nonNumericColumnName4";
String value4 = "nonNumericValue4";
byte[] rowKeyBytes = Bytes.toBytes(rowKey);
byte[] columnNameBytes = Bytes.toBytes(column);
byte[] valueBytes = Bytes.toBytes(value);
byte[] columnName2Bytes = Bytes.toBytes(column2);
byte[] value2Bytes = Bytes.toBytes(value2);
byte[] columnName3Bytes = Bytes.toBytes(column3);
byte[] value3Bytes = Bytes.toBytes(value3);
byte[] columnName4Bytes = Bytes.toBytes(column4);
byte[] value4Bytes = Bytes.toBytes(value4);
Put p = new Put(rowKeyBytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
valueBytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
value2Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
value3Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
value4Bytes);
Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get(
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Table flowRunTable = conn.getTable(table);
flowRunTable.put(p);
String rowKey2 = "nonNumericRowKey2";
byte[] rowKey2Bytes = Bytes.toBytes(rowKey2);
p = new Put(rowKey2Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
valueBytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
value2Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
value3Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
value4Bytes);
flowRunTable.put(p);
String rowKey3 = "nonNumericRowKey3";
byte[] rowKey3Bytes = Bytes.toBytes(rowKey3);
p = new Put(rowKey3Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
valueBytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
value2Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
value3Bytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
value4Bytes);
flowRunTable.put(p);
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(rowKeyBytes);
// set number of cells to fetch per scanner next invocation
int batchLimit = 2;
s.setBatch(batchLimit);
ResultScanner scanner = flowRunTable.getScanner(s);
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertTrue(values.size() <= batchLimit);
}
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(rowKeyBytes);
// set number of cells to fetch per scanner next invocation
batchLimit = 3;
s.setBatch(batchLimit);
scanner = flowRunTable.getScanner(s);
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertTrue(values.size() <= batchLimit);
}
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(rowKeyBytes);
// set number of cells to fetch per scanner next invocation
batchLimit = 1000;
s.setBatch(batchLimit);
scanner = flowRunTable.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertTrue(values.size() <= batchLimit);
// we expect all back in one next call
assertEquals(4, values.size());
rowCount++;
}
// should get back 1 row with each invocation
// if scan batch is set sufficiently high
assertEquals(3, rowCount);
// test with a negative number
// should have same effect as setting it to a high number
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(rowKeyBytes);
// set number of cells to fetch per scanner next invocation
batchLimit = -2992;
s.setBatch(batchLimit);
scanner = flowRunTable.getScanner(s);
rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(4, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
// we expect all back in one next call
assertEquals(4, values.size());
System.out.println(" values size " + values.size() + " " + batchLimit );
rowCount++;
}
// should get back 1 row with each invocation
// if scan batch is set sufficiently high
assertEquals(3, rowCount);
}
@Test @Test
public void testWriteFlowRunCompaction() throws Exception { public void testWriteFlowRunCompaction() throws Exception {
String cluster = "kompaction_cluster1"; String cluster = "kompaction_cluster1";
@ -176,13 +323,13 @@ public void testWriteFlowRunCompaction() throws Exception {
// check in flow run table // check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(TableName HRegionServer server = util.getRSForFirstRegionInTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
List<HRegion> regions = server.getOnlineRegions(TableName List<Region> regions = server.getOnlineRegions(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!", regions.size() > 0); assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
// flush and compact all the regions of the primary table // flush and compact all the regions of the primary table
for (HRegion region : regions) { for (Region region : regions) {
region.flushcache(); region.flush(true);
region.compactStores(true); region.compact(true);
} }
// check flow run for one flow many apps // check flow run for one flow many apps
@ -237,7 +384,7 @@ private FlowScanner getFlowScannerForTestingCompaction() {
request.setIsMajor(true, true); request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments // okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation // because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, -1, null, FlowScanner fs = new FlowScanner(null, null,
(request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION)); : FlowScannerOperation.MINOR_COMPACTION));
assertNotNull(fs); assertNotNull(fs);

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
@ -59,7 +59,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private boolean isFlowRunRegion = false; private boolean isFlowRunRegion = false;
private HRegion region; private Region region;
/** /**
* generate a timestamp that is unique per row in a region this is per region. * generate a timestamp that is unique per row in a region this is per region.
*/ */
@ -178,7 +178,7 @@ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
scan.setMaxVersions(); scan.setMaxVersions();
RegionScanner scanner = null; RegionScanner scanner = null;
try { try {
scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(), scanner = new FlowScanner(e.getEnvironment(), scan,
region.getScanner(scan), FlowScannerOperation.READ); region.getScanner(scan), FlowScannerOperation.READ);
scanner.next(results); scanner.next(results);
e.bypass(); e.bypass();
@ -233,7 +233,7 @@ public RegionScanner postScannerOpen(
if (!isFlowRunRegion) { if (!isFlowRunRegion) {
return scanner; return scanner;
} }
return new FlowScanner(e.getEnvironment(), scan.getBatch(), return new FlowScanner(e.getEnvironment(), scan,
scanner, FlowScannerOperation.READ); scanner, FlowScannerOperation.READ);
} }
@ -257,7 +257,7 @@ public InternalScanner preFlush(
+ " storeFilesCount=" + store.getStorefilesCount()); + " storeFilesCount=" + store.getStorefilesCount());
} }
} }
return new FlowScanner(c.getEnvironment(), -1, scanner, return new FlowScanner(c.getEnvironment(), scanner,
FlowScannerOperation.FLUSH); FlowScannerOperation.FLUSH);
} }
@ -296,10 +296,9 @@ public InternalScanner preCompact(
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION); : FlowScannerOperation.MINOR_COMPACTION);
LOG.info("Compactionrequest= " + request.toString() + " " LOG.info("Compactionrequest= " + request.toString() + " "
+ requestOp.toString() + " RegionName=" + requestOp.toString() + " RegionName=" + e.getEnvironment()
+ e.getEnvironment().getRegion().getRegionNameAsString()); .getRegion().getRegionInfo().getRegionNameAsString());
} }
return new FlowScanner(e.getEnvironment(), scanner, requestOp);
return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
} }
} }

View File

@ -35,10 +35,12 @@
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -68,9 +70,9 @@ class FlowScanner implements RegionScanner, Closeable {
*/ */
private static final String FLOW_APP_ID = "application_00000000000_0000"; private static final String FLOW_APP_ID = "application_00000000000_0000";
private final HRegion region; private final Region region;
private final InternalScanner flowRunScanner; private final InternalScanner flowRunScanner;
private final int limit; private final int batchSize;
private final long appFinalValueRetentionThreshold; private final long appFinalValueRetentionThreshold;
private RegionScanner regionScanner; private RegionScanner regionScanner;
private boolean hasMore; private boolean hasMore;
@ -79,9 +81,15 @@ class FlowScanner implements RegionScanner, Closeable {
private int currentIndex; private int currentIndex;
private FlowScannerOperation action = FlowScannerOperation.READ; private FlowScannerOperation action = FlowScannerOperation.READ;
FlowScanner(RegionCoprocessorEnvironment env, int limit, FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
FlowScannerOperation action) {
this(env, null, internalScanner, action);
}
FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
InternalScanner internalScanner, FlowScannerOperation action) { InternalScanner internalScanner, FlowScannerOperation action) {
this.limit = limit; this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
// TODO initialize other scan attributes like Scan#maxResultSize
this.flowRunScanner = internalScanner; this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) { if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner; this.regionScanner = (RegionScanner) internalScanner;
@ -98,7 +106,11 @@ class FlowScanner implements RegionScanner, Closeable {
YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
} }
if (LOG.isDebugEnabled()) {
LOG.debug(" batch size=" + batchSize);
} }
}
/* /*
* (non-Javadoc) * (non-Javadoc)
@ -112,22 +124,24 @@ public HRegionInfo getRegionInfo() {
@Override @Override
public boolean nextRaw(List<Cell> cells) throws IOException { public boolean nextRaw(List<Cell> cells) throws IOException {
return nextRaw(cells, limit); return nextRaw(cells, ScannerContext.newBuilder().build());
} }
@Override @Override
public boolean nextRaw(List<Cell> cells, int cellLimit) throws IOException { public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
return nextInternal(cells, cellLimit); throws IOException {
return nextInternal(cells, scannerContext);
} }
@Override @Override
public boolean next(List<Cell> cells) throws IOException { public boolean next(List<Cell> cells) throws IOException {
return next(cells, limit); return next(cells, ScannerContext.newBuilder().build());
} }
@Override @Override
public boolean next(List<Cell> cells, int cellLimit) throws IOException { public boolean next(List<Cell> cells, ScannerContext scannerContext)
return nextInternal(cells, cellLimit); throws IOException {
return nextInternal(cells, scannerContext);
} }
/** /**
@ -158,17 +172,6 @@ private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
return GenericConverter.getInstance(); return GenericConverter.getInstance();
} }
/**
* Checks if the converter is a numeric converter or not. For a converter to
* be numeric, it must implement {@link NumericValueConverter} interface.
* @param converter
* @return true, if converter is of type NumericValueConverter, false
* otherwise.
*/
private static boolean isNumericConverter(ValueConverter converter) {
return (converter instanceof NumericValueConverter);
}
/** /**
* This method loops through the cells in a given row of the * This method loops through the cells in a given row of the
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
@ -176,12 +179,11 @@ private static boolean isNumericConverter(ValueConverter converter) {
* column or returns the cell as is. * column or returns the cell as is.
* *
* @param cells * @param cells
* @param cellLimit * @param scannerContext
* @return true if next row is available for the scanner, false otherwise * @return true if next row is available for the scanner, false otherwise
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("deprecation") private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
private boolean nextInternal(List<Cell> cells, int cellLimit)
throws IOException { throws IOException {
Cell cell = null; Cell cell = null;
startNext(); startNext();
@ -194,48 +196,47 @@ private boolean nextInternal(List<Cell> cells, int cellLimit)
// So all cells in one qualifier come one after the other before we see the // So all cells in one qualifier come one after the other before we see the
// next column qualifier // next column qualifier
ByteArrayComparator comp = new ByteArrayComparator(); ByteArrayComparator comp = new ByteArrayComparator();
byte[] currentColumnQualifier = Separator.EMPTY_BYTES; byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
AggregationOperation currentAggOp = null; AggregationOperation currentAggOp = null;
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>(); Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0; int addedCnt = 0;
long currentTimestamp = System.currentTimeMillis(); long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null; ValueConverter converter = null;
int limit = batchSize;
while (cellLimit <= 0 || addedCnt < cellLimit) { while (limit <= 0 || addedCnt < limit) {
cell = peekAtNextCell(cellLimit); cell = peekAtNextCell(scannerContext);
if (cell == null) { if (cell == null) {
break; break;
} }
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (previousColumnQualifier == null) {
if (converter != null && isNumericConverter(converter)) { // first time in loop
previousColumnQualifier = currentColumnQualifier;
}
converter = getValueConverter(currentColumnQualifier);
if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp, addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
converter, currentTimestamp); converter, currentTimestamp);
}
resetState(currentColumnCells, alreadySeenAggDim); resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier; previousColumnQualifier = currentColumnQualifier;
currentAggOp = getCurrentAggOp(cell); currentAggOp = getCurrentAggOp(cell);
converter = getValueConverter(newColumnQualifier); converter = getValueConverter(currentColumnQualifier);
}
// No operation needs to be performed on non numeric converters.
if (!isNumericConverter(converter)) {
currentColumnCells.add(cell);
nextCell(cellLimit);
continue;
} }
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
(NumericValueConverter)converter); converter, scannerContext);
nextCell(cellLimit); nextCell(scannerContext);
} }
if (!currentColumnCells.isEmpty()) { if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp, addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
converter, currentTimestamp); currentTimestamp);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (addedCnt > 0) { if (addedCnt > 0) {
LOG.debug("emitted cells. " + addedCnt + " for " + this.action LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+ " rowKey=" + " rowKey="
+ FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString()); + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
} else { } else {
LOG.debug("emitted no cells for " + this.action); LOG.debug("emitted no cells for " + this.action);
} }
@ -252,7 +253,7 @@ private AggregationOperation getCurrentAggOp(Cell cell) {
} }
/** /**
* resets the parameters to an intialized state for next loop iteration. * resets the parameters to an initialized state for next loop iteration.
* *
* @param cell * @param cell
* @param currentAggOp * @param currentAggOp
@ -268,12 +269,12 @@ private void resetState(SortedSet<Cell> currentColumnCells,
private void collectCells(SortedSet<Cell> currentColumnCells, private void collectCells(SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, Cell cell, AggregationOperation currentAggOp, Cell cell,
Set<String> alreadySeenAggDim, NumericValueConverter converter) Set<String> alreadySeenAggDim, ValueConverter converter,
throws IOException { ScannerContext scannerContext) throws IOException {
if (currentAggOp == null) { if (currentAggOp == null) {
// not a min/max/metric cell, so just return it as is // not a min/max/metric cell, so just return it as is
currentColumnCells.add(cell); currentColumnCells.add(cell);
nextCell(limit);
return; return;
} }
@ -284,7 +285,7 @@ private void collectCells(SortedSet<Cell> currentColumnCells,
} else { } else {
Cell currentMinCell = currentColumnCells.first(); Cell currentMinCell = currentColumnCells.first();
Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
converter); (NumericValueConverter) converter);
if (!currentMinCell.equals(newMinCell)) { if (!currentMinCell.equals(newMinCell)) {
currentColumnCells.remove(currentMinCell); currentColumnCells.remove(currentMinCell);
currentColumnCells.add(newMinCell); currentColumnCells.add(newMinCell);
@ -297,7 +298,7 @@ private void collectCells(SortedSet<Cell> currentColumnCells,
} else { } else {
Cell currentMaxCell = currentColumnCells.first(); Cell currentMaxCell = currentColumnCells.first();
Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
converter); (NumericValueConverter) converter);
if (!currentMaxCell.equals(newMaxCell)) { if (!currentMaxCell.equals(newMaxCell)) {
currentColumnCells.remove(currentMaxCell); currentColumnCells.remove(currentMaxCell);
currentColumnCells.add(newMaxCell); currentColumnCells.add(newMaxCell);
@ -610,15 +611,14 @@ public boolean hasMore() {
* pointer to the next cell. This method can be called multiple times in a row * pointer to the next cell. This method can be called multiple times in a row
* to advance through all the available cells. * to advance through all the available cells.
* *
* @param cellLimit * @param scannerContext
* the limit of number of cells to return if the next batch must be * context information for the batch of cells under consideration
* fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for * @return the next available cell or null if no more cells are available for
* the current row * the current row
* @throws IOException * @throws IOException
*/ */
public Cell nextCell(int cellLimit) throws IOException { public Cell nextCell(ScannerContext scannerContext) throws IOException {
Cell cell = peekAtNextCell(cellLimit); Cell cell = peekAtNextCell(scannerContext);
if (cell != null) { if (cell != null) {
currentIndex++; currentIndex++;
} }
@ -630,20 +630,19 @@ public Cell nextCell(int cellLimit) throws IOException {
* pointer. Calling this method multiple times in a row will continue to * pointer. Calling this method multiple times in a row will continue to
* return the same cell. * return the same cell.
* *
* @param cellLimit * @param scannerContext
* the limit of number of cells to return if the next batch must be * context information for the batch of cells under consideration
* fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for * @return the next available cell or null if no more cells are available for
* the current row * the current row
* @throws IOException if any problem is encountered while grabbing the next * @throws IOException if any problem is encountered while grabbing the next
* cell. * cell.
*/ */
public Cell peekAtNextCell(int cellLimit) throws IOException { public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
if (currentIndex >= availableCells.size()) { if (currentIndex >= availableCells.size()) {
// done with current batch // done with current batch
availableCells.clear(); availableCells.clear();
currentIndex = 0; currentIndex = 0;
hasMore = flowRunScanner.next(availableCells, cellLimit); hasMore = flowRunScanner.next(availableCells, scannerContext);
} }
Cell cell = null; Cell cell = null;
if (currentIndex < availableCells.size()) { if (currentIndex < availableCells.size()) {
@ -720,4 +719,9 @@ public boolean reseek(byte[] bytes) throws IOException {
} }
return regionScanner.reseek(bytes); return regionScanner.reseek(bytes);
} }
@Override
public int getBatch() {
return batchSize;
}
} }