YARN-5142. fix findbugs warnings/errors for hadoop-yarn-server-timelineservice-hbase-tests. (Vrushali C via Varun Saxena)
This commit is contained in:
parent
0a9b085f05
commit
9c926cf432
@ -345,9 +345,11 @@ private static void loadData() throws Exception {
|
|||||||
"application_1111111111_1111", te5);
|
"application_1111111111_1111", te5);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
@ -390,7 +392,7 @@ private static ClientResponse getResponse(Client client, URI uri)
|
|||||||
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
if (resp == null ||
|
if (resp == null ||
|
||||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||||
String msg = new String();
|
String msg = "";
|
||||||
if (resp != null) {
|
if (resp != null) {
|
||||||
msg = resp.getClientResponseStatus().toString();
|
msg = resp.getClientResponseStatus().toString();
|
||||||
}
|
}
|
||||||
|
@ -678,32 +678,34 @@ public void testWriteApplicationToHBase() throws Exception {
|
|||||||
assertEquals(infoMap, infoColumns);
|
assertEquals(infoMap, infoColumns);
|
||||||
|
|
||||||
// Remember isRelatedTo is of type Map<String, Set<String>>
|
// Remember isRelatedTo is of type Map<String, Set<String>>
|
||||||
for (String isRelatedToKey : isRelatedTo.keySet()) {
|
for (Map.Entry<String, Set<String>> isRelatedToEntry : isRelatedTo
|
||||||
|
.entrySet()) {
|
||||||
Object isRelatedToValue =
|
Object isRelatedToValue =
|
||||||
ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
|
ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
|
||||||
isRelatedToKey);
|
isRelatedToEntry.getKey());
|
||||||
String compoundValue = isRelatedToValue.toString();
|
String compoundValue = isRelatedToValue.toString();
|
||||||
// id7?id9?id6
|
// id7?id9?id6
|
||||||
Set<String> isRelatedToValues =
|
Set<String> isRelatedToValues =
|
||||||
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
|
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
|
||||||
assertEquals(isRelatedTo.get(isRelatedToKey).size(),
|
assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(),
|
||||||
isRelatedToValues.size());
|
isRelatedToValues.size());
|
||||||
for (String v : isRelatedTo.get(isRelatedToKey)) {
|
for (String v : isRelatedToEntry.getValue()) {
|
||||||
assertTrue(isRelatedToValues.contains(v));
|
assertTrue(isRelatedToValues.contains(v));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RelatesTo
|
// RelatesTo
|
||||||
for (String relatesToKey : relatesTo.keySet()) {
|
for (Map.Entry<String, Set<String>> relatesToEntry : relatesTo
|
||||||
|
.entrySet()) {
|
||||||
String compoundValue =
|
String compoundValue =
|
||||||
ApplicationColumnPrefix.RELATES_TO.readResult(result,
|
ApplicationColumnPrefix.RELATES_TO.readResult(result,
|
||||||
relatesToKey).toString();
|
relatesToEntry.getKey()).toString();
|
||||||
// id3?id4?id5
|
// id3?id4?id5
|
||||||
Set<String> relatesToValues =
|
Set<String> relatesToValues =
|
||||||
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
|
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
|
||||||
assertEquals(relatesTo.get(relatesToKey).size(),
|
assertEquals(relatesTo.get(relatesToEntry.getKey()).size(),
|
||||||
relatesToValues.size());
|
relatesToValues.size());
|
||||||
for (String v : relatesTo.get(relatesToKey)) {
|
for (String v : relatesToEntry.getValue()) {
|
||||||
assertTrue(relatesToValues.contains(v));
|
assertTrue(relatesToValues.contains(v));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -938,41 +940,43 @@ public void testWriteEntityToHBase() throws Exception {
|
|||||||
assertEquals(infoMap, infoColumns);
|
assertEquals(infoMap, infoColumns);
|
||||||
|
|
||||||
// Remember isRelatedTo is of type Map<String, Set<String>>
|
// Remember isRelatedTo is of type Map<String, Set<String>>
|
||||||
for (String isRelatedToKey : isRelatedTo.keySet()) {
|
for (Map.Entry<String, Set<String>> isRelatedToEntry : isRelatedTo
|
||||||
|
.entrySet()) {
|
||||||
Object isRelatedToValue =
|
Object isRelatedToValue =
|
||||||
EntityColumnPrefix.IS_RELATED_TO.readResult(result,
|
EntityColumnPrefix.IS_RELATED_TO.readResult(result,
|
||||||
isRelatedToKey);
|
isRelatedToEntry.getKey());
|
||||||
String compoundValue = isRelatedToValue.toString();
|
String compoundValue = isRelatedToValue.toString();
|
||||||
// id7?id9?id6
|
// id7?id9?id6
|
||||||
Set<String> isRelatedToValues =
|
Set<String> isRelatedToValues =
|
||||||
new HashSet<String>(
|
new HashSet<String>(
|
||||||
Separator.VALUES.splitEncoded(compoundValue));
|
Separator.VALUES.splitEncoded(compoundValue));
|
||||||
assertEquals(isRelatedTo.get(isRelatedToKey).size(),
|
assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(),
|
||||||
isRelatedToValues.size());
|
isRelatedToValues.size());
|
||||||
for (String v : isRelatedTo.get(isRelatedToKey)) {
|
for (String v : isRelatedToEntry.getValue()) {
|
||||||
assertTrue(isRelatedToValues.contains(v));
|
assertTrue(isRelatedToValues.contains(v));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RelatesTo
|
// RelatesTo
|
||||||
for (String relatesToKey : relatesTo.keySet()) {
|
for (Map.Entry<String, Set<String>> relatesToEntry : relatesTo
|
||||||
String compoundValue =
|
.entrySet()) {
|
||||||
EntityColumnPrefix.RELATES_TO.readResult(result, relatesToKey)
|
String compoundValue = EntityColumnPrefix.RELATES_TO
|
||||||
.toString();
|
.readResult(result, relatesToEntry.getKey()).toString();
|
||||||
// id3?id4?id5
|
// id3?id4?id5
|
||||||
Set<String> relatesToValues =
|
Set<String> relatesToValues =
|
||||||
new HashSet<String>(
|
new HashSet<String>(
|
||||||
Separator.VALUES.splitEncoded(compoundValue));
|
Separator.VALUES.splitEncoded(compoundValue));
|
||||||
assertEquals(relatesTo.get(relatesToKey).size(),
|
assertEquals(relatesTo.get(relatesToEntry.getKey()).size(),
|
||||||
relatesToValues.size());
|
relatesToValues.size());
|
||||||
for (String v : relatesTo.get(relatesToKey)) {
|
for (String v : relatesToEntry.getValue()) {
|
||||||
assertTrue(relatesToValues.contains(v));
|
assertTrue(relatesToValues.contains(v));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
Map<String, Object> configColumns =
|
Map<String, Object> configColumns =
|
||||||
EntityColumnPrefix.CONFIG.readResults(result, StringKeyConverter.getInstance());
|
EntityColumnPrefix.CONFIG.readResults(result,
|
||||||
|
StringKeyConverter.getInstance());
|
||||||
assertEquals(conf, configColumns);
|
assertEquals(conf, configColumns);
|
||||||
|
|
||||||
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
|
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
|
||||||
@ -1273,10 +1277,12 @@ public void testEventsWithEmptyInfo() throws IOException {
|
|||||||
assertTrue(info == null || info.isEmpty());
|
assertTrue(info == null || info.isEmpty());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.stop();
|
hbi.stop();
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEventsEscapeTs() throws IOException {
|
public void testEventsEscapeTs() throws IOException {
|
||||||
|
@ -56,7 +56,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
|||||||
long ts = insertTs;
|
long ts = insertTs;
|
||||||
|
|
||||||
for (int k = 1; k < 100; k++) {
|
for (int k = 1; k < 100; k++) {
|
||||||
metricValues.put(ts - k*200000, 20L);
|
metricValues.put(ts - k * 200000L, 20L);
|
||||||
}
|
}
|
||||||
metricValues.put(ts - 80000, 40L);
|
metricValues.put(ts - 80000, 40L);
|
||||||
m1.setType(Type.TIME_SERIES);
|
m1.setType(Type.TIME_SERIES);
|
||||||
@ -68,7 +68,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
|||||||
metricValues = new HashMap<Long, Number>();
|
metricValues = new HashMap<Long, Number>();
|
||||||
ts = System.currentTimeMillis();
|
ts = System.currentTimeMillis();
|
||||||
for (int k=1; k< 100 ; k++) {
|
for (int k=1; k< 100 ; k++) {
|
||||||
metricValues.put(ts - k*100000, 31L);
|
metricValues.put(ts - k*100000L, 31L);
|
||||||
}
|
}
|
||||||
|
|
||||||
metricValues.put(ts - 80000, 57L);
|
metricValues.put(ts - 80000, 57L);
|
||||||
|
@ -148,8 +148,10 @@ public void testWriteFlowRunMinMax() throws Exception {
|
|||||||
// flush everything to hbase
|
// flush everything to hbase
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Connection conn = ConnectionFactory.createConnection(c1);
|
Connection conn = ConnectionFactory.createConnection(c1);
|
||||||
// check in flow activity table
|
// check in flow activity table
|
||||||
@ -199,9 +201,11 @@ public void testWriteFlowRunMinMax() throws Exception {
|
|||||||
assertEquals(1, flowRuns.size());
|
assertEquals(1, flowRuns.size());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 1 application entity and checks the record for today in the flow
|
* Write 1 application entity and checks the record for today in the flow
|
||||||
@ -230,8 +234,10 @@ public void testWriteFlowActivityOneFlow() throws Exception {
|
|||||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// check flow activity
|
// check flow activity
|
||||||
checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
|
checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
|
||||||
appCreatedTime);
|
appCreatedTime);
|
||||||
@ -260,9 +266,11 @@ public void testWriteFlowActivityOneFlow() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkFlowActivityTable(String cluster, String user, String flow,
|
private void checkFlowActivityTable(String cluster, String user, String flow,
|
||||||
String flowVersion, long runid, Configuration c1, long appCreatedTime)
|
String flowVersion, long runid, Configuration c1, long appCreatedTime)
|
||||||
@ -351,8 +359,10 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
|
|||||||
|
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// check flow activity
|
// check flow activity
|
||||||
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
|
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
|
||||||
runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
|
runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
|
||||||
@ -396,9 +406,11 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkFlowActivityTableSeveralRuns(String cluster, String user,
|
private void checkFlowActivityTableSeveralRuns(String cluster, String user,
|
||||||
String flow, Configuration c1, String flowVersion1, long runid1,
|
String flow, Configuration c1, String flowVersion1, long runid1,
|
||||||
|
@ -75,8 +75,8 @@ public class TestHBaseStorageFlowRun {
|
|||||||
|
|
||||||
private static HBaseTestingUtility util;
|
private static HBaseTestingUtility util;
|
||||||
|
|
||||||
private final String metric1 = "MAP_SLOT_MILLIS";
|
private static final String METRIC1 = "MAP_SLOT_MILLIS";
|
||||||
private final String metric2 = "HDFS_BYTES_READ";
|
private static final String METRIC2 = "HDFS_BYTES_READ";
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupBeforeClass() throws Exception {
|
public static void setupBeforeClass() throws Exception {
|
||||||
@ -213,8 +213,10 @@ public void testWriteFlowRunMinMax() throws Exception {
|
|||||||
// flush everything to hbase
|
// flush everything to hbase
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Connection conn = ConnectionFactory.createConnection(c1);
|
Connection conn = ConnectionFactory.createConnection(c1);
|
||||||
// check in flow run table
|
// check in flow run table
|
||||||
@ -257,9 +259,11 @@ public void testWriteFlowRunMinMax() throws Exception {
|
|||||||
assertEquals(minStartTs, flowRun.getStartTime());
|
assertEquals(minStartTs, flowRun.getStartTime());
|
||||||
assertEquals(endTs, flowRun.getMaxEndTime());
|
assertEquals(endTs, flowRun.getMaxEndTime());
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes two application entities of the same flow run. Each application has
|
* Writes two application entities of the same flow run. Each application has
|
||||||
@ -299,8 +303,10 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
|
|||||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check flow run
|
// check flow run
|
||||||
checkFlowRunTable(cluster, user, flow, runid, c1);
|
checkFlowRunTable(cluster, user, flow, runid, c1);
|
||||||
@ -327,10 +333,10 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
|
|||||||
value = n;
|
value = n;
|
||||||
}
|
}
|
||||||
switch (id) {
|
switch (id) {
|
||||||
case metric1:
|
case METRIC1:
|
||||||
assertEquals(141L, value);
|
assertEquals(141L, value);
|
||||||
break;
|
break;
|
||||||
case metric2:
|
case METRIC2:
|
||||||
assertEquals(57L, value);
|
assertEquals(57L, value);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -338,9 +344,11 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkFlowRunTable(String cluster, String user, String flow,
|
private void checkFlowRunTable(String cluster, String user, String flow,
|
||||||
long runid, Configuration c1) throws IOException {
|
long runid, Configuration c1) throws IOException {
|
||||||
@ -365,14 +373,14 @@ private void checkFlowRunTable(String cluster, String user, String flow,
|
|||||||
rowCount++;
|
rowCount++;
|
||||||
// check metric1
|
// check metric1
|
||||||
byte[] q = ColumnHelper.getColumnQualifier(
|
byte[] q = ColumnHelper.getColumnQualifier(
|
||||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
|
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC1);
|
||||||
assertTrue(values.containsKey(q));
|
assertTrue(values.containsKey(q));
|
||||||
assertEquals(141L, Bytes.toLong(values.get(q)));
|
assertEquals(141L, Bytes.toLong(values.get(q)));
|
||||||
|
|
||||||
// check metric2
|
// check metric2
|
||||||
assertEquals(3, values.size());
|
assertEquals(3, values.size());
|
||||||
q = ColumnHelper.getColumnQualifier(
|
q = ColumnHelper.getColumnQualifier(
|
||||||
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
|
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC2);
|
||||||
assertTrue(values.containsKey(q));
|
assertTrue(values.containsKey(q));
|
||||||
assertEquals(57L, Bytes.toLong(values.get(q)));
|
assertEquals(57L, Bytes.toLong(values.get(q)));
|
||||||
}
|
}
|
||||||
@ -407,8 +415,10 @@ public void testWriteFlowRunMetricsPrefix() throws Exception {
|
|||||||
hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
|
hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// use the timeline reader to verify data
|
// use the timeline reader to verify data
|
||||||
HBaseTimelineReaderImpl hbr = null;
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
@ -418,7 +428,7 @@ public void testWriteFlowRunMetricsPrefix() throws Exception {
|
|||||||
hbr.start();
|
hbr.start();
|
||||||
TimelineFilterList metricsToRetrieve = new TimelineFilterList(
|
TimelineFilterList metricsToRetrieve = new TimelineFilterList(
|
||||||
Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
|
Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
|
||||||
metric1.substring(0, metric1.indexOf("_") + 1)));
|
METRIC1.substring(0, METRIC1.indexOf("_") + 1)));
|
||||||
TimelineEntity entity = hbr.getEntity(
|
TimelineEntity entity = hbr.getEntity(
|
||||||
new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
|
new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
|
||||||
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
||||||
@ -435,7 +445,7 @@ Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
|
|||||||
value = n;
|
value = n;
|
||||||
}
|
}
|
||||||
switch (id) {
|
switch (id) {
|
||||||
case metric1:
|
case METRIC1:
|
||||||
assertEquals(40L, value);
|
assertEquals(40L, value);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -455,9 +465,11 @@ Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
|
|||||||
}
|
}
|
||||||
assertEquals(2, metricCnt);
|
assertEquals(2, metricCnt);
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteFlowRunsMetricFields() throws Exception {
|
public void testWriteFlowRunsMetricFields() throws Exception {
|
||||||
@ -488,8 +500,10 @@ public void testWriteFlowRunsMetricFields() throws Exception {
|
|||||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check flow run
|
// check flow run
|
||||||
checkFlowRunTable(cluster, user, flow, runid, c1);
|
checkFlowRunTable(cluster, user, flow, runid, c1);
|
||||||
@ -528,10 +542,10 @@ public void testWriteFlowRunsMetricFields() throws Exception {
|
|||||||
value = n;
|
value = n;
|
||||||
}
|
}
|
||||||
switch (id) {
|
switch (id) {
|
||||||
case metric1:
|
case METRIC1:
|
||||||
assertEquals(141L, value);
|
assertEquals(141L, value);
|
||||||
break;
|
break;
|
||||||
case metric2:
|
case METRIC2:
|
||||||
assertEquals(57L, value);
|
assertEquals(57L, value);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -540,9 +554,11 @@ public void testWriteFlowRunsMetricFields() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteFlowRunFlush() throws Exception {
|
public void testWriteFlowRunFlush() throws Exception {
|
||||||
@ -595,8 +611,10 @@ public void testWriteFlowRunFlush() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
hbi.close();
|
hbi.close();
|
||||||
|
}
|
||||||
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
|
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
|
||||||
true);
|
true);
|
||||||
}
|
}
|
||||||
@ -665,8 +683,10 @@ public void testFilterFlowRunsByCreatedTime() throws Exception {
|
|||||||
"application_11111111111111_2222", te);
|
"application_11111111111111_2222", te);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// use the timeline reader to verify data
|
// use the timeline reader to verify data
|
||||||
HBaseTimelineReaderImpl hbr = null;
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
@ -711,9 +731,11 @@ public void testFilterFlowRunsByCreatedTime() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetricFilters() throws Exception {
|
public void testMetricFilters() throws Exception {
|
||||||
@ -742,8 +764,10 @@ public void testMetricFilters() throws Exception {
|
|||||||
"application_11111111111111_2222", te);
|
"application_11111111111111_2222", te);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// use the timeline reader to verify data
|
// use the timeline reader to verify data
|
||||||
HBaseTimelineReaderImpl hbr = null;
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
@ -754,12 +778,12 @@ public void testMetricFilters() throws Exception {
|
|||||||
|
|
||||||
TimelineFilterList list1 = new TimelineFilterList();
|
TimelineFilterList list1 = new TimelineFilterList();
|
||||||
list1.addFilter(new TimelineCompareFilter(
|
list1.addFilter(new TimelineCompareFilter(
|
||||||
TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
|
TimelineCompareOp.GREATER_OR_EQUAL, METRIC1, 101));
|
||||||
TimelineFilterList list2 = new TimelineFilterList();
|
TimelineFilterList list2 = new TimelineFilterList();
|
||||||
list2.addFilter(new TimelineCompareFilter(
|
list2.addFilter(new TimelineCompareFilter(
|
||||||
TimelineCompareOp.LESS_THAN, metric1, 43));
|
TimelineCompareOp.LESS_THAN, METRIC1, 43));
|
||||||
list2.addFilter(new TimelineCompareFilter(
|
list2.addFilter(new TimelineCompareFilter(
|
||||||
TimelineCompareOp.EQUAL, metric2, 57));
|
TimelineCompareOp.EQUAL, METRIC2, 57));
|
||||||
TimelineFilterList metricFilterList =
|
TimelineFilterList metricFilterList =
|
||||||
new TimelineFilterList(Operator.OR, list1, list2);
|
new TimelineFilterList(Operator.OR, list1, list2);
|
||||||
Set<TimelineEntity> entities = hbr.getEntities(
|
Set<TimelineEntity> entities = hbr.getEntities(
|
||||||
@ -777,8 +801,8 @@ public void testMetricFilters() throws Exception {
|
|||||||
|
|
||||||
TimelineFilterList metricFilterList1 = new TimelineFilterList(
|
TimelineFilterList metricFilterList1 = new TimelineFilterList(
|
||||||
new TimelineCompareFilter(
|
new TimelineCompareFilter(
|
||||||
TimelineCompareOp.LESS_OR_EQUAL, metric1, 127),
|
TimelineCompareOp.LESS_OR_EQUAL, METRIC1, 127),
|
||||||
new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 30));
|
new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, METRIC2, 30));
|
||||||
entities = hbr.getEntities(
|
entities = hbr.getEntities(
|
||||||
new TimelineReaderContext(cluster, user, flow, null, null,
|
new TimelineReaderContext(cluster, user, flow, null, null,
|
||||||
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
||||||
@ -793,8 +817,8 @@ public void testMetricFilters() throws Exception {
|
|||||||
assertEquals(2, metricCnt);
|
assertEquals(2, metricCnt);
|
||||||
|
|
||||||
TimelineFilterList metricFilterList2 = new TimelineFilterList(
|
TimelineFilterList metricFilterList2 = new TimelineFilterList(
|
||||||
new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, metric1, 32),
|
new TimelineCompareFilter(TimelineCompareOp.LESS_THAN, METRIC1, 32),
|
||||||
new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, metric2, 57));
|
new TimelineCompareFilter(TimelineCompareOp.NOT_EQUAL, METRIC2, 57));
|
||||||
entities = hbr.getEntities(
|
entities = hbr.getEntities(
|
||||||
new TimelineReaderContext(cluster, user, flow, null, null,
|
new TimelineReaderContext(cluster, user, flow, null, null,
|
||||||
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
||||||
@ -815,17 +839,17 @@ public void testMetricFilters() throws Exception {
|
|||||||
|
|
||||||
TimelineFilterList list3 = new TimelineFilterList();
|
TimelineFilterList list3 = new TimelineFilterList();
|
||||||
list3.addFilter(new TimelineCompareFilter(
|
list3.addFilter(new TimelineCompareFilter(
|
||||||
TimelineCompareOp.GREATER_OR_EQUAL, metric1, 101));
|
TimelineCompareOp.GREATER_OR_EQUAL, METRIC1, 101));
|
||||||
TimelineFilterList list4 = new TimelineFilterList();
|
TimelineFilterList list4 = new TimelineFilterList();
|
||||||
list4.addFilter(new TimelineCompareFilter(
|
list4.addFilter(new TimelineCompareFilter(
|
||||||
TimelineCompareOp.LESS_THAN, metric1, 43));
|
TimelineCompareOp.LESS_THAN, METRIC1, 43));
|
||||||
list4.addFilter(new TimelineCompareFilter(
|
list4.addFilter(new TimelineCompareFilter(
|
||||||
TimelineCompareOp.EQUAL, metric2, 57));
|
TimelineCompareOp.EQUAL, METRIC2, 57));
|
||||||
TimelineFilterList metricFilterList4 =
|
TimelineFilterList metricFilterList4 =
|
||||||
new TimelineFilterList(Operator.OR, list3, list4);
|
new TimelineFilterList(Operator.OR, list3, list4);
|
||||||
TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR,
|
TimelineFilterList metricsToRetrieve = new TimelineFilterList(Operator.OR,
|
||||||
new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
|
new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
|
||||||
metric2.substring(0, metric2.indexOf("_") + 1)));
|
METRIC2.substring(0, METRIC2.indexOf("_") + 1)));
|
||||||
entities = hbr.getEntities(
|
entities = hbr.getEntities(
|
||||||
new TimelineReaderContext(cluster, user, flow, null, null,
|
new TimelineReaderContext(cluster, user, flow, null, null,
|
||||||
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
|
||||||
@ -840,9 +864,11 @@ public void testMetricFilters() throws Exception {
|
|||||||
}
|
}
|
||||||
assertEquals(1, metricCnt);
|
assertEquals(1, metricCnt);
|
||||||
} finally {
|
} finally {
|
||||||
|
if (hbr != null) {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
@ -69,8 +69,8 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||||||
|
|
||||||
private static HBaseTestingUtility util;
|
private static HBaseTestingUtility util;
|
||||||
|
|
||||||
private final String metric1 = "MAP_SLOT_MILLIS";
|
private static final String metric1 = "MAP_SLOT_MILLIS";
|
||||||
private final String metric2 = "HDFS_BYTES_READ";
|
private static final String metric2 = "HDFS_BYTES_READ";
|
||||||
|
|
||||||
private final byte[] aRowKey = Bytes.toBytes("a");
|
private final byte[] aRowKey = Bytes.toBytes("a");
|
||||||
private final byte[] aFamily = Bytes.toBytes("family");
|
private final byte[] aFamily = Bytes.toBytes("family");
|
||||||
@ -166,10 +166,12 @@ public void testWriteFlowRunCompaction() throws Exception {
|
|||||||
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete(
|
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete(
|
||||||
insertTs + 1, c1);
|
insertTs + 1, c1);
|
||||||
te1.addEntity(entityApp1);
|
te1.addEntity(entityApp1);
|
||||||
|
if (hbi != null) {
|
||||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
|
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
|
||||||
hbi.flush();
|
hbi.flush();
|
||||||
hbi.close();
|
hbi.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check in flow run table
|
// check in flow run table
|
||||||
HRegionServer server = util.getRSForFirstRegionInTable(TableName
|
HRegionServer server = util.getRSForFirstRegionInTable(TableName
|
||||||
|
Loading…
Reference in New Issue
Block a user