YARN-6094. Update the coprocessor to be a dynamically loaded one. Contributed by Vrushali C.
This commit is contained in:
parent
5e0acee75e
commit
580d884913
@ -2146,6 +2146,18 @@ public static boolean isAclEnabled(Configuration conf) {
|
||||
+ "hbase.coprocessor.app-final-value-retention-milliseconds";
|
||||
|
||||
/**
|
||||
* The name of the setting for the location of the coprocessor
|
||||
* jar on hdfs.
|
||||
*/
|
||||
public static final String FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION =
|
||||
TIMELINE_SERVICE_PREFIX
|
||||
+ "hbase.coprocessor.jar.hdfs.location";
|
||||
|
||||
/** default hdfs location for flowrun coprocessor jar. */
|
||||
public static final String DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR =
|
||||
"/hbase/coprocessor/hadoop-yarn-server-timelineservice.jar";
|
||||
|
||||
/**
|
||||
* The name for setting that points to an optional HBase configuration
|
||||
* (hbase-site.xml file) with settings that will override the ones found on
|
||||
* the classpath.
|
||||
|
@ -2341,6 +2341,15 @@
|
||||
<value>259200000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The default hdfs location for flowrun coprocessor jar.
|
||||
</description>
|
||||
<name>yarn.timeline-service.hbase.coprocessor.jar.hdfs.location
|
||||
</name>
|
||||
<value>/hbase/coprocessor/hadoop-yarn-server-timelineservice.jar</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The value of this parameter sets the prefix for all tables that are part of
|
||||
|
@ -52,8 +52,8 @@
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||
@ -89,7 +89,7 @@ public static void setup() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
util.startMiniCluster();
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
DataGeneratorForTest.createSchema(conf);
|
||||
loadData();
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
@ -31,10 +32,33 @@
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
|
||||
final class DataGeneratorForTest {
|
||||
static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
/**
|
||||
* Utility class that creates the schema and generates test data.
|
||||
*/
|
||||
public final class DataGeneratorForTest {
|
||||
|
||||
// private constructor for utility class
|
||||
private DataGeneratorForTest() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the schema for timeline service.
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void createSchema(final Configuration conf)
|
||||
throws IOException {
|
||||
// set the jar location to null so that
|
||||
// the coprocessor class is loaded from classpath
|
||||
conf.set(YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION, " ");
|
||||
// now create all tables
|
||||
TimelineSchemaCreator.createAllTables(conf, false);
|
||||
}
|
||||
|
||||
public static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
TimelineEntities te = new TimelineEntities();
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "application_1111111111_2222";
|
||||
@ -43,11 +67,7 @@ static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
Long cTime = 1425016502000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap = new HashMap<>();
|
||||
infoMap.put("infoMapKey1", "infoMapValue2");
|
||||
infoMap.put("infoMapKey2", 20);
|
||||
infoMap.put("infoMapKey3", 85.85);
|
||||
entity.addInfo(infoMap);
|
||||
entity.addInfo(getInfoMap3());
|
||||
// add the isRelatedToEntity info
|
||||
Set<String> isRelatedToSet = new HashSet<>();
|
||||
isRelatedToSet.add("relatedto1");
|
||||
@ -72,29 +92,15 @@ static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
entity.addConfigs(conf);
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId("MAP_SLOT_MILLIS");
|
||||
Map<Long, Number> metricValues = new HashMap<>();
|
||||
long ts = System.currentTimeMillis();
|
||||
metricValues.put(ts - 120000, 100000000);
|
||||
metricValues.put(ts - 100000, 200000000);
|
||||
metricValues.put(ts - 80000, 300000000);
|
||||
metricValues.put(ts - 60000, 400000000);
|
||||
metricValues.put(ts - 40000, 50000000000L);
|
||||
metricValues.put(ts - 20000, 60000000000L);
|
||||
m1.setType(Type.TIME_SERIES);
|
||||
m1.setValues(metricValues);
|
||||
metrics.add(m1);
|
||||
metrics.add(getMetric4(ts));
|
||||
|
||||
TimelineMetric m12 = new TimelineMetric();
|
||||
m12.setId("MAP1_BYTES");
|
||||
m12.addValue(ts, 50);
|
||||
metrics.add(m12);
|
||||
entity.addMetrics(metrics);
|
||||
TimelineEvent event = new TimelineEvent();
|
||||
event.setId("start_event");
|
||||
event.setTimestamp(ts);
|
||||
entity.addEvent(event);
|
||||
entity.addEvent(addStartEvent(ts));
|
||||
te.addEntity(entity);
|
||||
TimelineEntities te1 = new TimelineEntities();
|
||||
TimelineEntity entity1 = new TimelineEntity();
|
||||
@ -103,10 +109,7 @@ static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
entity1.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entity1.setCreatedTime(cTime + 20L);
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap1 = new HashMap<>();
|
||||
infoMap1.put("infoMapKey1", "infoMapValue1");
|
||||
infoMap1.put("infoMapKey2", 10);
|
||||
entity1.addInfo(infoMap1);
|
||||
entity1.addInfo(getInfoMap4());
|
||||
|
||||
// add the isRelatedToEntity info
|
||||
Set<String> isRelatedToSet1 = new HashSet<>();
|
||||
@ -134,21 +137,7 @@ static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
entity1.addConfigs(conf1);
|
||||
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics1 = new HashSet<>();
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId("MAP1_SLOT_MILLIS");
|
||||
Map<Long, Number> metricValues1 = new HashMap<>();
|
||||
long ts1 = System.currentTimeMillis();
|
||||
metricValues1.put(ts1 - 120000, 100000000);
|
||||
metricValues1.put(ts1 - 100000, 200000000);
|
||||
metricValues1.put(ts1 - 80000, 300000000);
|
||||
metricValues1.put(ts1 - 60000, 400000000);
|
||||
metricValues1.put(ts1 - 40000, 50000000000L);
|
||||
metricValues1.put(ts1 - 20000, 60000000000L);
|
||||
m2.setType(Type.TIME_SERIES);
|
||||
m2.setValues(metricValues1);
|
||||
metrics1.add(m2);
|
||||
entity1.addMetrics(metrics1);
|
||||
entity1.addMetrics(getMetrics4());
|
||||
TimelineEvent event11 = new TimelineEvent();
|
||||
event11.setId("end_event");
|
||||
event11.setTimestamp(ts);
|
||||
@ -160,27 +149,7 @@ static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
te1.addEntity(entity1);
|
||||
|
||||
TimelineEntities te2 = new TimelineEntities();
|
||||
TimelineEntity entity2 = new TimelineEntity();
|
||||
String id2 = "application_1111111111_4444";
|
||||
entity2.setId(id2);
|
||||
entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entity2.setCreatedTime(cTime + 40L);
|
||||
TimelineEvent event21 = new TimelineEvent();
|
||||
event21.setId("update_event");
|
||||
event21.setTimestamp(ts - 20);
|
||||
entity2.addEvent(event21);
|
||||
Set<String> isRelatedToSet2 = new HashSet<String>();
|
||||
isRelatedToSet2.add("relatedto3");
|
||||
Map<String, Set<String>> isRelatedTo2 = new HashMap<>();
|
||||
isRelatedTo2.put("task1", isRelatedToSet2);
|
||||
entity2.setIsRelatedToEntities(isRelatedTo2);
|
||||
Map<String, Set<String>> relatesTo3 = new HashMap<>();
|
||||
Set<String> relatesToSet14 = new HashSet<String>();
|
||||
relatesToSet14.add("relatesto7");
|
||||
relatesTo3.put("container2", relatesToSet14);
|
||||
entity2.setRelatesToEntities(relatesTo3);
|
||||
|
||||
te2.addEntity(entity2);
|
||||
te2.addEntity(getEntity4(cTime, ts));
|
||||
HBaseTimelineWriterImpl hbi = null;
|
||||
try {
|
||||
hbi = new HBaseTimelineWriterImpl();
|
||||
@ -206,7 +175,140 @@ static void loadApps(HBaseTestingUtility util) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
private static Set<TimelineMetric> getMetrics4() {
|
||||
Set<TimelineMetric> metrics1 = new HashSet<>();
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId("MAP1_SLOT_MILLIS");
|
||||
long ts1 = System.currentTimeMillis();
|
||||
Map<Long, Number> metricValues1 = new HashMap<>();
|
||||
metricValues1.put(ts1 - 120000, 100000000);
|
||||
metricValues1.put(ts1 - 100000, 200000000);
|
||||
metricValues1.put(ts1 - 80000, 300000000);
|
||||
metricValues1.put(ts1 - 60000, 400000000);
|
||||
metricValues1.put(ts1 - 40000, 50000000000L);
|
||||
metricValues1.put(ts1 - 20000, 60000000000L);
|
||||
m2.setType(Type.TIME_SERIES);
|
||||
m2.setValues(metricValues1);
|
||||
metrics1.add(m2);
|
||||
return metrics1;
|
||||
}
|
||||
|
||||
private static TimelineEntity getEntity4(long cTime, long ts) {
|
||||
TimelineEntity entity2 = new TimelineEntity();
|
||||
String id2 = "application_1111111111_4444";
|
||||
entity2.setId(id2);
|
||||
entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entity2.setCreatedTime(cTime + 40L);
|
||||
TimelineEvent event21 = new TimelineEvent();
|
||||
event21.setId("update_event");
|
||||
event21.setTimestamp(ts - 20);
|
||||
entity2.addEvent(event21);
|
||||
Set<String> isRelatedToSet2 = new HashSet<String>();
|
||||
isRelatedToSet2.add("relatedto3");
|
||||
Map<String, Set<String>> isRelatedTo2 = new HashMap<>();
|
||||
isRelatedTo2.put("task1", isRelatedToSet2);
|
||||
entity2.setIsRelatedToEntities(isRelatedTo2);
|
||||
Map<String, Set<String>> relatesTo3 = new HashMap<>();
|
||||
Set<String> relatesToSet14 = new HashSet<String>();
|
||||
relatesToSet14.add("relatesto7");
|
||||
relatesTo3.put("container2", relatesToSet14);
|
||||
entity2.setRelatesToEntities(relatesTo3);
|
||||
return entity2;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInfoMap4() {
|
||||
Map<String, Object> infoMap1 = new HashMap<>();
|
||||
infoMap1.put("infoMapKey1", "infoMapValue1");
|
||||
infoMap1.put("infoMapKey2", 10);
|
||||
return infoMap1;
|
||||
}
|
||||
|
||||
private static TimelineMetric getMetric4(long ts) {
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId("MAP_SLOT_MILLIS");
|
||||
Map<Long, Number> metricValues = new HashMap<>();
|
||||
metricValues.put(ts - 120000, 100000000);
|
||||
metricValues.put(ts - 100000, 200000000);
|
||||
metricValues.put(ts - 80000, 300000000);
|
||||
metricValues.put(ts - 60000, 400000000);
|
||||
metricValues.put(ts - 40000, 50000000000L);
|
||||
metricValues.put(ts - 20000, 60000000000L);
|
||||
m1.setType(Type.TIME_SERIES);
|
||||
m1.setValues(metricValues);
|
||||
return m1;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInfoMap3() {
|
||||
Map<String, Object> infoMap = new HashMap<>();
|
||||
infoMap.put("infoMapKey1", "infoMapValue2");
|
||||
infoMap.put("infoMapKey2", 20);
|
||||
infoMap.put("infoMapKey3", 85.85);
|
||||
return infoMap;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInfoMap1() {
|
||||
Map<String, Object> infoMap = new HashMap<>();
|
||||
infoMap.put("infoMapKey1", "infoMapValue2");
|
||||
infoMap.put("infoMapKey2", 20);
|
||||
infoMap.put("infoMapKey3", 71.4);
|
||||
return infoMap;
|
||||
}
|
||||
|
||||
private static Map<String, Set<String>> getRelatesTo1() {
|
||||
Set<String> relatesToSet = new HashSet<String>();
|
||||
relatesToSet.add("relatesto1");
|
||||
relatesToSet.add("relatesto3");
|
||||
Map<String, Set<String>> relatesTo = new HashMap<>();
|
||||
relatesTo.put("container", relatesToSet);
|
||||
Set<String> relatesToSet11 = new HashSet<>();
|
||||
relatesToSet11.add("relatesto4");
|
||||
relatesTo.put("container1", relatesToSet11);
|
||||
return relatesTo;
|
||||
}
|
||||
|
||||
private static Map<String, String> getConfig1() {
|
||||
Map<String, String> conf = new HashMap<>();
|
||||
conf.put("config_param1", "value1");
|
||||
conf.put("config_param2", "value2");
|
||||
conf.put("cfg_param1", "value3");
|
||||
return conf;
|
||||
}
|
||||
|
||||
private static Map<String, String> getConfig2() {
|
||||
Map<String, String> conf1 = new HashMap<>();
|
||||
conf1.put("cfg_param1", "value1");
|
||||
conf1.put("cfg_param2", "value2");
|
||||
return conf1;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInfoMap2() {
|
||||
Map<String, Object> infoMap1 = new HashMap<>();
|
||||
infoMap1.put("infoMapKey1", "infoMapValue1");
|
||||
infoMap1.put("infoMapKey2", 10);
|
||||
return infoMap1;
|
||||
}
|
||||
|
||||
private static Map<String, Set<String>> getIsRelatedTo1() {
|
||||
Set<String> isRelatedToSet = new HashSet<>();
|
||||
isRelatedToSet.add("relatedto1");
|
||||
Map<String, Set<String>> isRelatedTo = new HashMap<>();
|
||||
isRelatedTo.put("task", isRelatedToSet);
|
||||
return isRelatedTo;
|
||||
}
|
||||
|
||||
private static Map<Long, Number> getMetricValues1(long ts) {
|
||||
Map<Long, Number> metricValues = new HashMap<>();
|
||||
metricValues.put(ts - 120000, 100000000);
|
||||
metricValues.put(ts - 100000, 200000000);
|
||||
metricValues.put(ts - 80000, 300000000);
|
||||
metricValues.put(ts - 60000, 400000000);
|
||||
metricValues.put(ts - 40000, 50000000000L);
|
||||
metricValues.put(ts - 20000, 70000000000L);
|
||||
return metricValues;
|
||||
}
|
||||
|
||||
public static void loadEntities(HBaseTestingUtility util)
|
||||
throws IOException {
|
||||
TimelineEntities te = new TimelineEntities();
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "hello";
|
||||
@ -216,50 +318,23 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
Long cTime = 1425016502000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap = new HashMap<>();
|
||||
infoMap.put("infoMapKey1", "infoMapValue2");
|
||||
infoMap.put("infoMapKey2", 20);
|
||||
infoMap.put("infoMapKey3", 71.4);
|
||||
entity.addInfo(infoMap);
|
||||
entity.addInfo(getInfoMap1());
|
||||
// add the isRelatedToEntity info
|
||||
Set<String> isRelatedToSet = new HashSet<>();
|
||||
isRelatedToSet.add("relatedto1");
|
||||
Map<String, Set<String>> isRelatedTo = new HashMap<>();
|
||||
isRelatedTo.put("task", isRelatedToSet);
|
||||
entity.setIsRelatedToEntities(isRelatedTo);
|
||||
entity.setIsRelatedToEntities(getIsRelatedTo1());
|
||||
|
||||
// add the relatesTo info
|
||||
Set<String> relatesToSet = new HashSet<String>();
|
||||
relatesToSet.add("relatesto1");
|
||||
relatesToSet.add("relatesto3");
|
||||
Map<String, Set<String>> relatesTo = new HashMap<>();
|
||||
relatesTo.put("container", relatesToSet);
|
||||
Set<String> relatesToSet11 = new HashSet<>();
|
||||
relatesToSet11.add("relatesto4");
|
||||
relatesTo.put("container1", relatesToSet11);
|
||||
entity.setRelatesToEntities(relatesTo);
|
||||
entity.setRelatesToEntities(getRelatesTo1());
|
||||
|
||||
// add some config entries
|
||||
Map<String, String> conf = new HashMap<>();
|
||||
conf.put("config_param1", "value1");
|
||||
conf.put("config_param2", "value2");
|
||||
conf.put("cfg_param1", "value3");
|
||||
entity.addConfigs(conf);
|
||||
entity.addConfigs(getConfig1());
|
||||
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
m1.setId("MAP_SLOT_MILLIS");
|
||||
Map<Long, Number> metricValues = new HashMap<>();
|
||||
long ts = System.currentTimeMillis();
|
||||
metricValues.put(ts - 120000, 100000000);
|
||||
metricValues.put(ts - 100000, 200000000);
|
||||
metricValues.put(ts - 80000, 300000000);
|
||||
metricValues.put(ts - 60000, 400000000);
|
||||
metricValues.put(ts - 40000, 50000000000L);
|
||||
metricValues.put(ts - 20000, 70000000000L);
|
||||
m1.setType(Type.TIME_SERIES);
|
||||
m1.setValues(metricValues);
|
||||
m1.setValues(getMetricValues1(ts));
|
||||
metrics.add(m1);
|
||||
|
||||
TimelineMetric m12 = new TimelineMetric();
|
||||
@ -267,10 +342,7 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
m12.addValue(ts, 50);
|
||||
metrics.add(m12);
|
||||
entity.addMetrics(metrics);
|
||||
TimelineEvent event = new TimelineEvent();
|
||||
event.setId("start_event");
|
||||
event.setTimestamp(ts);
|
||||
entity.addEvent(event);
|
||||
entity.addEvent(addStartEvent(ts));
|
||||
te.addEntity(entity);
|
||||
|
||||
TimelineEntity entity1 = new TimelineEntity();
|
||||
@ -280,10 +352,7 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
entity1.setCreatedTime(cTime + 20L);
|
||||
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap1 = new HashMap<>();
|
||||
infoMap1.put("infoMapKey1", "infoMapValue1");
|
||||
infoMap1.put("infoMapKey2", 10);
|
||||
entity1.addInfo(infoMap1);
|
||||
entity1.addInfo(getInfoMap2());
|
||||
|
||||
// add event.
|
||||
TimelineEvent event11 = new TimelineEvent();
|
||||
@ -297,15 +366,7 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
|
||||
|
||||
// add the isRelatedToEntity info
|
||||
Set<String> isRelatedToSet1 = new HashSet<>();
|
||||
isRelatedToSet1.add("relatedto3");
|
||||
isRelatedToSet1.add("relatedto5");
|
||||
Map<String, Set<String>> isRelatedTo1 = new HashMap<>();
|
||||
isRelatedTo1.put("task1", isRelatedToSet1);
|
||||
Set<String> isRelatedToSet11 = new HashSet<>();
|
||||
isRelatedToSet11.add("relatedto4");
|
||||
isRelatedTo1.put("task2", isRelatedToSet11);
|
||||
entity1.setIsRelatedToEntities(isRelatedTo1);
|
||||
entity1.setIsRelatedToEntities(getIsRelatedTo2());
|
||||
|
||||
// add the relatesTo info
|
||||
Set<String> relatesToSet1 = new HashSet<String>();
|
||||
@ -316,49 +377,20 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
entity1.setRelatesToEntities(relatesTo1);
|
||||
|
||||
// add some config entries
|
||||
Map<String, String> conf1 = new HashMap<>();
|
||||
conf1.put("cfg_param1", "value1");
|
||||
conf1.put("cfg_param2", "value2");
|
||||
entity1.addConfigs(conf1);
|
||||
entity1.addConfigs(getConfig2());
|
||||
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics1 = new HashSet<>();
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
m2.setId("MAP1_SLOT_MILLIS");
|
||||
Map<Long, Number> metricValues1 = new HashMap<>();
|
||||
long ts1 = System.currentTimeMillis();
|
||||
metricValues1.put(ts1 - 120000, 100000000);
|
||||
metricValues1.put(ts1 - 100000, 200000000);
|
||||
metricValues1.put(ts1 - 80000, 300000000);
|
||||
metricValues1.put(ts1 - 60000, 400000000);
|
||||
metricValues1.put(ts1 - 40000, 50000000000L);
|
||||
metricValues1.put(ts1 - 20000, 60000000000L);
|
||||
m2.setType(Type.TIME_SERIES);
|
||||
m2.setValues(metricValues1);
|
||||
m2.setValues(getMetricValues2(ts1));
|
||||
metrics1.add(m2);
|
||||
entity1.addMetrics(metrics1);
|
||||
te.addEntity(entity1);
|
||||
|
||||
TimelineEntity entity2 = new TimelineEntity();
|
||||
String id2 = "hello2";
|
||||
entity2.setId(id2);
|
||||
entity2.setType(type);
|
||||
entity2.setCreatedTime(cTime + 40L);
|
||||
TimelineEvent event21 = new TimelineEvent();
|
||||
event21.setId("update_event");
|
||||
event21.setTimestamp(ts - 20);
|
||||
entity2.addEvent(event21);
|
||||
Set<String> isRelatedToSet2 = new HashSet<>();
|
||||
isRelatedToSet2.add("relatedto3");
|
||||
Map<String, Set<String>> isRelatedTo2 = new HashMap<>();
|
||||
isRelatedTo2.put("task1", isRelatedToSet2);
|
||||
entity2.setIsRelatedToEntities(isRelatedTo2);
|
||||
Map<String, Set<String>> relatesTo3 = new HashMap<>();
|
||||
Set<String> relatesToSet14 = new HashSet<>();
|
||||
relatesToSet14.add("relatesto7");
|
||||
relatesTo3.put("container2", relatesToSet14);
|
||||
entity2.setRelatesToEntities(relatesTo3);
|
||||
te.addEntity(entity2);
|
||||
te.addEntity(getEntity2(type, cTime, ts));
|
||||
|
||||
// For listing types
|
||||
for (int i = 0; i < 10; i++) {
|
||||
@ -421,4 +453,58 @@ static void loadEntities(HBaseTestingUtility util) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static TimelineEntity getEntity2(String type, long cTime,
|
||||
long ts) {
|
||||
TimelineEntity entity2 = new TimelineEntity();
|
||||
String id2 = "hello2";
|
||||
entity2.setId(id2);
|
||||
entity2.setType(type);
|
||||
entity2.setCreatedTime(cTime + 40L);
|
||||
TimelineEvent event21 = new TimelineEvent();
|
||||
event21.setId("update_event");
|
||||
event21.setTimestamp(ts - 20);
|
||||
entity2.addEvent(event21);
|
||||
Set<String> isRelatedToSet2 = new HashSet<>();
|
||||
isRelatedToSet2.add("relatedto3");
|
||||
Map<String, Set<String>> isRelatedTo2 = new HashMap<>();
|
||||
isRelatedTo2.put("task1", isRelatedToSet2);
|
||||
entity2.setIsRelatedToEntities(isRelatedTo2);
|
||||
Map<String, Set<String>> relatesTo3 = new HashMap<>();
|
||||
Set<String> relatesToSet14 = new HashSet<>();
|
||||
relatesToSet14.add("relatesto7");
|
||||
relatesTo3.put("container2", relatesToSet14);
|
||||
entity2.setRelatesToEntities(relatesTo3);
|
||||
return entity2;
|
||||
}
|
||||
|
||||
private static TimelineEvent addStartEvent(long ts) {
|
||||
TimelineEvent event = new TimelineEvent();
|
||||
event.setId("start_event");
|
||||
event.setTimestamp(ts);
|
||||
return event;
|
||||
}
|
||||
|
||||
private static Map<Long, Number> getMetricValues2(long ts1) {
|
||||
Map<Long, Number> metricValues1 = new HashMap<>();
|
||||
metricValues1.put(ts1 - 120000, 100000000);
|
||||
metricValues1.put(ts1 - 100000, 200000000);
|
||||
metricValues1.put(ts1 - 80000, 300000000);
|
||||
metricValues1.put(ts1 - 60000, 400000000);
|
||||
metricValues1.put(ts1 - 40000, 50000000000L);
|
||||
metricValues1.put(ts1 - 20000, 60000000000L);
|
||||
return metricValues1;
|
||||
}
|
||||
|
||||
private static Map<String, Set<String>> getIsRelatedTo2() {
|
||||
Set<String> isRelatedToSet1 = new HashSet<>();
|
||||
isRelatedToSet1.add("relatedto3");
|
||||
isRelatedToSet1.add("relatedto5");
|
||||
Map<String, Set<String>> isRelatedTo1 = new HashMap<>();
|
||||
isRelatedTo1.put("task1", isRelatedToSet1);
|
||||
Set<String> isRelatedToSet11 = new HashSet<>();
|
||||
isRelatedToSet11.add("relatedto4");
|
||||
isRelatedTo1.put("task2", isRelatedToSet11);
|
||||
return isRelatedTo1;
|
||||
}
|
||||
}
|
||||
|
@ -92,14 +92,10 @@ public class TestHBaseTimelineStorageApps {
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
util = new HBaseTestingUtility();
|
||||
util.startMiniCluster();
|
||||
createSchema();
|
||||
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||
DataGeneratorForTest.loadApps(util);
|
||||
}
|
||||
|
||||
private static void createSchema() throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
reader = new HBaseTimelineReaderImpl();
|
||||
|
@ -99,14 +99,10 @@ public class TestHBaseTimelineStorageEntities {
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
util = new HBaseTestingUtility();
|
||||
util.startMiniCluster();
|
||||
createSchema();
|
||||
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||
DataGeneratorForTest.loadEntities(util);
|
||||
}
|
||||
|
||||
private static void createSchema() throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
reader = new HBaseTimelineReaderImpl();
|
||||
|
@ -48,17 +48,15 @@ public class TestHBaseTimelineStorageSchema {
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
util = new HBaseTestingUtility();
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
||||
private static void createSchema(Configuration conf) throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(conf, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithDefaultPrefix() throws IOException {
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
createSchema(hbaseConf);
|
||||
DataGeneratorForTest.createSchema(hbaseConf);
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Admin admin = conn.getAdmin();
|
||||
@ -88,7 +86,7 @@ public void createWithSetPrefix() throws IOException {
|
||||
String prefix = "unit-test.";
|
||||
hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
|
||||
prefix);
|
||||
createSchema(hbaseConf);
|
||||
DataGeneratorForTest.createSchema(hbaseConf);
|
||||
Connection conn = null;
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
Admin admin = conn.getAdmin();
|
||||
@ -115,7 +113,7 @@ public void createWithSetPrefix() throws IOException {
|
||||
prefix = "yet-another-unit-test.";
|
||||
hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME,
|
||||
prefix);
|
||||
createSchema(hbaseConf);
|
||||
DataGeneratorForTest.createSchema(hbaseConf);
|
||||
entityTableName = BaseTable.getTableName(hbaseConf,
|
||||
EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME);
|
||||
assertTrue(admin.tableExists(entityTableName));
|
||||
|
@ -33,7 +33,7 @@
|
||||
/**
|
||||
* Generates the data/entities for the FlowRun and FlowActivity Tables.
|
||||
*/
|
||||
final class TestFlowDataGenerator {
|
||||
public final class TestFlowDataGenerator {
|
||||
private TestFlowDataGenerator() {
|
||||
}
|
||||
|
||||
@ -41,7 +41,8 @@ private TestFlowDataGenerator() {
|
||||
private static final String METRIC_2 = "HDFS_BYTES_READ";
|
||||
public static final long END_TS_INCR = 10000L;
|
||||
|
||||
static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
||||
public static TimelineEntity getEntityMetricsApp1(long insertTs,
|
||||
Configuration c1) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunMetrics_test";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -83,7 +84,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
|
||||
}
|
||||
|
||||
|
||||
static TimelineEntity getEntityMetricsApp1Complete(long insertTs,
|
||||
public static TimelineEntity getEntityMetricsApp1Complete(long insertTs,
|
||||
Configuration c1) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunMetrics_test";
|
||||
@ -125,7 +126,7 @@ static TimelineEntity getEntityMetricsApp1Complete(long insertTs,
|
||||
}
|
||||
|
||||
|
||||
static TimelineEntity getEntityMetricsApp1(long insertTs) {
|
||||
public static TimelineEntity getEntityMetricsApp1(long insertTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunMetrics_test";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -168,8 +169,7 @@ static TimelineEntity getEntityMetricsApp1(long insertTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
|
||||
static TimelineEntity getEntityMetricsApp2(long insertTs) {
|
||||
public static TimelineEntity getEntityMetricsApp2(long insertTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunMetrics_test";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -200,7 +200,7 @@ static TimelineEntity getEntityMetricsApp2(long insertTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getEntity1() {
|
||||
public static TimelineEntity getEntity1() {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunHello";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -243,7 +243,7 @@ static TimelineEntity getEntity1() {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getAFullEntity(long ts, long endTs) {
|
||||
public static TimelineEntity getAFullEntity(long ts, long endTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunFullEntity";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -292,7 +292,7 @@ static TimelineEntity getAFullEntity(long ts, long endTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getEntityGreaterStartTime(long startTs) {
|
||||
public static TimelineEntity getEntityGreaterStartTime(long startTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setCreatedTime(startTs);
|
||||
entity.setId("flowRunHello with greater start time");
|
||||
@ -308,7 +308,7 @@ static TimelineEntity getEntityGreaterStartTime(long startTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getEntityMaxEndTime(long endTs) {
|
||||
public static TimelineEntity getEntityMaxEndTime(long endTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setId("flowRunHello Max End time");
|
||||
entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
@ -322,7 +322,7 @@ static TimelineEntity getEntityMaxEndTime(long endTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getEntityMinStartTime(long startTs) {
|
||||
public static TimelineEntity getEntityMinStartTime(long startTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunHelloMInStartTime";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -336,7 +336,7 @@ static TimelineEntity getEntityMinStartTime(long startTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getMinFlushEntity(long startTs) {
|
||||
public static TimelineEntity getMinFlushEntity(long startTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunHelloFlushEntityMin";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -350,7 +350,7 @@ static TimelineEntity getMinFlushEntity(long startTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getMaxFlushEntity(long startTs) {
|
||||
public static TimelineEntity getMaxFlushEntity(long startTs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunHelloFlushEntityMax";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
@ -365,7 +365,7 @@ static TimelineEntity getMaxFlushEntity(long startTs) {
|
||||
return entity;
|
||||
}
|
||||
|
||||
static TimelineEntity getFlowApp1(long appCreatedTime) {
|
||||
public static TimelineEntity getFlowApp1(long appCreatedTime) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowActivity_test";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
|
@ -47,9 +47,9 @@
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||
@ -70,11 +70,7 @@ public static void setupBeforeClass() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
util.startMiniCluster();
|
||||
createSchema();
|
||||
}
|
||||
|
||||
private static void createSchema() throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,7 +19,6 @@
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -41,8 +40,8 @@
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
@ -57,10 +56,10 @@
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||
@ -84,11 +83,7 @@ public static void setupBeforeClass() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
util.startMiniCluster();
|
||||
createSchema();
|
||||
}
|
||||
|
||||
private static void createSchema() throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -106,12 +101,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
|
||||
// check the regions.
|
||||
// check in flow run table
|
||||
util.waitUntilAllRegionsAssigned(table);
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
|
||||
hbaseConf));
|
||||
}
|
||||
checkCoprocessorExists(table, true);
|
||||
}
|
||||
|
||||
table = BaseTable.getTableName(hbaseConf,
|
||||
@ -121,12 +111,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
|
||||
// check the regions.
|
||||
// check in flow activity table
|
||||
util.waitUntilAllRegionsAssigned(table);
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
|
||||
hbaseConf));
|
||||
}
|
||||
checkCoprocessorExists(table, false);
|
||||
}
|
||||
|
||||
table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME,
|
||||
@ -135,12 +120,23 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
|
||||
// check the regions.
|
||||
// check in entity run table
|
||||
util.waitUntilAllRegionsAssigned(table);
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(),
|
||||
hbaseConf));
|
||||
checkCoprocessorExists(table, false);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkCoprocessorExists(TableName table, boolean exists)
|
||||
throws IOException, InterruptedException {
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
boolean found = false;
|
||||
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
|
||||
for (String coprocName : coprocs) {
|
||||
if (coprocName.contains("FlowRunCoprocessor")) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
assertEquals(found, exists);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,8 +50,8 @@
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||
@ -82,11 +82,7 @@ public static void setupBeforeClass() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
util.startMiniCluster();
|
||||
createSchema();
|
||||
}
|
||||
|
||||
private static void createSchema() throws IOException {
|
||||
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
||||
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -58,7 +58,6 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FlowRunCoprocessor.class);
|
||||
private boolean isFlowRunRegion = false;
|
||||
|
||||
private Region region;
|
||||
/**
|
||||
@ -72,15 +71,9 @@ public void start(CoprocessorEnvironment e) throws IOException {
|
||||
if (e instanceof RegionCoprocessorEnvironment) {
|
||||
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
|
||||
this.region = env.getRegion();
|
||||
isFlowRunRegion = FlowRunTable.isFlowRunTable(
|
||||
region.getRegionInfo(), env.getConfiguration());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isFlowRunRegion() {
|
||||
return isFlowRunRegion;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
@ -100,10 +93,6 @@ public boolean isFlowRunRegion() {
|
||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
|
||||
WALEdit edit, Durability durability) throws IOException {
|
||||
Map<String, byte[]> attributes = put.getAttributesMap();
|
||||
|
||||
if (!isFlowRunRegion) {
|
||||
return;
|
||||
}
|
||||
// Assumption is that all the cells in a put are the same operation.
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
if ((attributes != null) && (attributes.size() > 0)) {
|
||||
@ -171,10 +160,6 @@ private long getCellTimestamp(long timestamp, List<Tag> tags) {
|
||||
@Override
|
||||
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Get get, List<Cell> results) throws IOException {
|
||||
if (!isFlowRunRegion) {
|
||||
return;
|
||||
}
|
||||
|
||||
Scan scan = new Scan(get);
|
||||
scan.setMaxVersions();
|
||||
RegionScanner scanner = null;
|
||||
@ -206,12 +191,9 @@ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
public RegionScanner preScannerOpen(
|
||||
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||
RegionScanner scanner) throws IOException {
|
||||
|
||||
if (isFlowRunRegion) {
|
||||
// set max versions for scan to see all
|
||||
// versions to aggregate for metrics
|
||||
scan.setMaxVersions();
|
||||
}
|
||||
// set max versions for scan to see all
|
||||
// versions to aggregate for metrics
|
||||
scan.setMaxVersions();
|
||||
return scanner;
|
||||
}
|
||||
|
||||
@ -231,9 +213,6 @@ public RegionScanner preScannerOpen(
|
||||
public RegionScanner postScannerOpen(
|
||||
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||
RegionScanner scanner) throws IOException {
|
||||
if (!isFlowRunRegion) {
|
||||
return scanner;
|
||||
}
|
||||
return new FlowScanner(e.getEnvironment(), scan,
|
||||
scanner, FlowScannerOperation.READ);
|
||||
}
|
||||
@ -242,9 +221,6 @@ public RegionScanner postScannerOpen(
|
||||
public InternalScanner preFlush(
|
||||
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner) throws IOException {
|
||||
if (!isFlowRunRegion) {
|
||||
return scanner;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (store != null) {
|
||||
LOG.debug("preFlush store = " + store.getColumnFamilyName()
|
||||
@ -265,9 +241,6 @@ public InternalScanner preFlush(
|
||||
@Override
|
||||
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, StoreFile resultFile) {
|
||||
if (!isFlowRunRegion) {
|
||||
return;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (store != null) {
|
||||
LOG.debug("postFlush store = " + store.getColumnFamilyName()
|
||||
@ -289,9 +262,6 @@ public InternalScanner preCompact(
|
||||
InternalScanner scanner, ScanType scanType, CompactionRequest request)
|
||||
throws IOException {
|
||||
|
||||
if (!isFlowRunRegion) {
|
||||
return scanner;
|
||||
}
|
||||
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
|
||||
if (request != null) {
|
||||
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
|
||||
|
@ -21,7 +21,6 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
@ -30,6 +29,8 @@
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
|
||||
/**
|
||||
* The flow run table has column family info
|
||||
@ -134,29 +135,17 @@ public void createTable(Admin admin, Configuration hbaseConf)
|
||||
infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
|
||||
|
||||
// TODO: figure the split policy
|
||||
flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
|
||||
.getCanonicalName());
|
||||
String coprocessorJarPathStr = hbaseConf.get(
|
||||
YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION,
|
||||
YarnConfiguration.DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR);
|
||||
|
||||
Path coprocessorJarPath = new Path(coprocessorJarPathStr);
|
||||
LOG.info("CoprocessorJarPath=" + coprocessorJarPath.toString());
|
||||
flowRunTableDescp.addCoprocessor(
|
||||
FlowRunCoprocessor.class.getCanonicalName(), coprocessorJarPath,
|
||||
Coprocessor.PRIORITY_USER, null);
|
||||
admin.createTable(flowRunTableDescp);
|
||||
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
||||
+ admin.tableExists(table));
|
||||
}
|
||||
|
||||
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
|
||||
Configuration conf) {
|
||||
String regionTableName = hRegionInfo.getTable().getNameAsString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("regionTableName=" + regionTableName);
|
||||
}
|
||||
String flowRunTableName = BaseTable.getTableName(conf,
|
||||
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)
|
||||
.getNameAsString();
|
||||
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(" table is the flow run table!! "
|
||||
+ flowRunTableName);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -209,26 +209,28 @@ http://hbase.apache.org/book.html#standalone.over.hdfs .
|
||||
Once you have an Apache HBase cluster ready to use, perform the following steps.
|
||||
|
||||
##### <a name="Enable_the_coprocessor"> </a>Step 2) Enable the coprocessor
|
||||
In this version, the coprocessor is loaded dynamically (table coprocessor for the `flowrun` table).
|
||||
|
||||
Step 2.1) Add the timeline service jar to the HBase classpath in all HBase machines in the cluster. It
|
||||
is needed for the coprocessor as well as the schema creator. For example,
|
||||
Copy the timeline service jar to HDFS from where HBase can load it. It
|
||||
is needed for the `flowrun` table creation in the schema creator. The default HDFS location is `/hbase/coprocessor`.
|
||||
For example,
|
||||
|
||||
cp hadoop-yarn-server-timelineservice-hbase-3.0.0-alpha1-SNAPSHOT.jar /usr/hbase/lib/
|
||||
hadoop fs -mkdir /hbase/coprocessor
|
||||
hadoop fs -put hadoop-yarn-server-timelineservice-3.0.0-alpha1-SNAPSHOT.jar
|
||||
/hbase/coprocessor/hadoop-yarn-server-timelineservice.jar
|
||||
|
||||
Step 2.2) Enable the coprocessor that handles the aggregation. To enable it, add the following entry in
|
||||
region servers' `hbase-site.xml` file (generally located in the `conf` directory) as follows:
|
||||
|
||||
If you want to place the jar at a different location on hdfs, there also exists a yarn
|
||||
configuration setting called `yarn.timeline-service.hbase.coprocessor.jar.hdfs.location`.
|
||||
For example,
|
||||
|
||||
```
|
||||
<property>
|
||||
<name>hbase.coprocessor.region.classes</name>
|
||||
<value>org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunCoprocessor</value>
|
||||
<name>yarn.timeline-service.hbase.coprocessor.jar.hdfs.location</name>
|
||||
<value>/custom/hdfs/path/jarName</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
Step 2.3) Restart the region servers and the master to pick up the timeline service jar as well
|
||||
as the config change. In this version, the coprocessor is loaded statically
|
||||
(i.e. system coprocessor) as opposed to a dynamically (table coprocessor).
|
||||
|
||||
##### <a name="Create_schema"> </a>Step 3) Create the timeline service schema
|
||||
Finally, run the schema creator tool to create the necessary tables:
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user