diff --git a/pom.xml b/pom.xml
index ef829ad..b5384f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,101 +1,136 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
- com.thinker
- flink-test
- 1.0-SNAPSHOT
+ com.thinker
+ flink-test
+ 1.0-SNAPSHOT
- flink-test
-
- http://www.example.com
+ flink-test
+
+ http://www.example.com
-
- UTF-8
- 1.7
- 1.7
-
+
+ UTF-8
+ 1.8
+ 1.8
+ 1.10.0
+ 2.11
+
-
+
-
- org.apache.flink
- flink-java
- 1.10.0
-
-
- org.apache.flink
- flink-streaming-java_2.11
- 1.10.0
-
-
- org.apache.flink
- flink-clients_2.11
- 1.10.0
-
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+
-
- org.apache.flink
- statefun-sdk
- 2.0.0
-
-
- org.apache.flink
- statefun-flink-harness
- 2.0.0
-
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.7
+ runtime
+
+
+ log4j
+ log4j
+ 1.2.17
+ runtime
+
+
+
+ org.apache.flink
+ flink-connector-kafka-0.11_${scala.binary.version}
+ ${flink.version}
+
-
- junit
- junit
- 4.11
- test
-
-
+
+ org.apache.flink
+ statefun-sdk
+ 2.0.0
+
+
+ org.apache.flink
+ statefun-flink-harness
+ 2.0.0
+
-
-
-
+
+
+ com.alibaba
+ fastjson
+ 1.2.51
+
-
-
- net.alchim31.maven
- scala-maven-plugin
- 3.4.6
-
-
-
-
- compile
- testCompile
-
-
-
-
+
+ mysql
+ mysql-connector-java
+ 5.1.34
+
-
- org.apache.maven.plugins
- maven-assembly-plugin
- 3.0.0
-
-
- jar-with-dependencies
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
+
+ junit
+ junit
+ 4.11
+ test
+
+
-
-
-
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.4.6
+
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.0.0
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/src/main/java/com/thinker/Main.java b/src/main/java/com/thinker/Main.java
new file mode 100644
index 0000000..146189c
--- /dev/null
+++ b/src/main/java/com/thinker/Main.java
@@ -0,0 +1,39 @@
+package com.thinker;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+
+import java.util.Properties;
+
+/**
+ * @author zeekling [lingzhaohui@zeekling.cn]
+ * @version 1.0
+ * @apiNote
+ * @since 2020-05-05
+ */
+public class Main {
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("zookeeper.connect", "localhost:2181");
+ props.put("group.id", "metric-group");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("auto.offset.reset", "latest"); //value 反序列化
+
+ DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
+ "metric", //kafka topic
+ new SimpleStringSchema(), // String 序列化
+ props)).setParallelism(1);
+
+ dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台
+
+ env.execute("Flink add data source");
+ }
+
+}
diff --git a/src/main/java/com/thinker/model/Metric.java b/src/main/java/com/thinker/model/Metric.java
new file mode 100644
index 0000000..f09ce45
--- /dev/null
+++ b/src/main/java/com/thinker/model/Metric.java
@@ -0,0 +1,71 @@
+package com.thinker.model;
+
+import java.util.Map;
+
+/**
+ * @author zeekling [lingzhaohui@zeekling.cn]
+ * @version 1.0
+ * @apiNote 实体类
+ * @since 2020-05-05
+ */
+public class Metric {
+
+
+ private String name;
+ private long timestamp;
+ private Map fields;
+ private Map tags;
+
+
+ public Metric() {
+ }
+
+ public Metric(String name, long timestamp, Map fields, Map tags) {
+ this.name = name;
+ this.timestamp = timestamp;
+ this.fields = fields;
+ this.tags = tags;
+ }
+
+ @Override
+ public String toString() {
+ return "Metric{" +
+ "name='" + name + '\'' +
+ ", timestamp='" + timestamp + '\'' +
+ ", fields=" + fields +
+ ", tags=" + tags +
+ '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Map getFields() {
+ return fields;
+ }
+
+ public void setFields(Map fields) {
+ this.fields = fields;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+}
diff --git a/src/main/java/com/thinker/util/KafkaUtils.java b/src/main/java/com/thinker/util/KafkaUtils.java
new file mode 100644
index 0000000..5b87997
--- /dev/null
+++ b/src/main/java/com/thinker/util/KafkaUtils.java
@@ -0,0 +1,61 @@
+package com.thinker.util;
+
+import com.thinker.model.Metric;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import com.alibaba.fastjson.JSON;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author zeekling [lingzhaohui@zeekling.cn]
+ * @version 1.0
+ * @apiNote 往 kafka 中写数据工具类:
+ * @since 2020-05-05
+ */
+public class KafkaUtils {
+
+ private static final String broker_list = "localhost:9092";
+ private static final String topic = "metric"; // kafka topic,Flink 程序中需要和这个统一
+
+ private static void writeToKafka() throws InterruptedException {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", broker_list);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
+ KafkaProducer producer = new KafkaProducer(props);
+
+ Metric metric = new Metric();
+ metric.setTimestamp(System.currentTimeMillis());
+ metric.setName("mem");
+ Map tags = new HashMap<>();
+ Map fields = new HashMap<>();
+
+ tags.put("cluster", "zhisheng");
+ tags.put("host_ip", "101.147.022.106");
+
+ fields.put("used_percent", 90d);
+ fields.put("max", 27244873d);
+ fields.put("used", 17244873d);
+ fields.put("init", 27244873d);
+
+ metric.setTags(tags);
+ metric.setFields(fields);
+
+ ProducerRecord record = new ProducerRecord(topic, null, null, JSON.toJSONString(metric));
+ producer.send(record);
+ System.out.println("发送数据: " + JSON.toJSONString(metric));
+
+ producer.flush();
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ while (true) {
+ Thread.sleep(300);
+ writeToKafka();
+ }
+ }
+
+}