From 01c707d62ef7fe75a184fb9264f5cc54f9982026 Mon Sep 17 00:00:00 2001 From: zeek <984294471@qq.com> Date: Tue, 5 May 2020 14:07:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0pom?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 207 ++++++++++-------- src/main/java/com/thinker/Main.java | 39 ++++ src/main/java/com/thinker/model/Metric.java | 71 ++++++ .../java/com/thinker/util/KafkaUtils.java | 61 ++++++ 4 files changed, 292 insertions(+), 86 deletions(-) create mode 100644 src/main/java/com/thinker/Main.java create mode 100644 src/main/java/com/thinker/model/Metric.java create mode 100644 src/main/java/com/thinker/util/KafkaUtils.java diff --git a/pom.xml b/pom.xml index ef829ad..b5384f6 100644 --- a/pom.xml +++ b/pom.xml @@ -1,101 +1,136 @@ - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - com.thinker - flink-test - 1.0-SNAPSHOT + com.thinker + flink-test + 1.0-SNAPSHOT - flink-test - - http://www.example.com + flink-test + + http://www.example.com - - UTF-8 - 1.7 - 1.7 - + + UTF-8 + 1.8 + 1.8 + 1.10.0 + 2.11 + - + - - org.apache.flink - flink-java - 1.10.0 - - - org.apache.flink - flink-streaming-java_2.11 - 1.10.0 - - - org.apache.flink - flink-clients_2.11 - 1.10.0 - + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + - - org.apache.flink - statefun-sdk - 2.0.0 - - - org.apache.flink - statefun-flink-harness - 2.0.0 - + + + org.slf4j + slf4j-log4j12 + 1.7.7 + runtime + + + log4j + log4j + 1.2.17 + runtime + + + + org.apache.flink + flink-connector-kafka-0.11_${scala.binary.version} + ${flink.version} + - - junit - junit - 4.11 - test - - + + org.apache.flink + statefun-sdk + 2.0.0 + + + org.apache.flink + statefun-flink-harness + 2.0.0 + - - - + + + com.alibaba + fastjson + 1.2.51 + - - - net.alchim31.maven - scala-maven-plugin - 3.4.6 - - - - - compile - testCompile - - - - + + mysql + mysql-connector-java + 5.1.34 + - - org.apache.maven.plugins - maven-assembly-plugin - 3.0.0 - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - + + junit + junit + 4.11 + test + + - - - + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.4.6 + + + + + compile + testCompile + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + diff --git a/src/main/java/com/thinker/Main.java b/src/main/java/com/thinker/Main.java new file mode 100644 index 0000000..146189c --- /dev/null +++ b/src/main/java/com/thinker/Main.java @@ -0,0 +1,39 @@ +package com.thinker; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; + +import java.util.Properties; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote + * @since 2020-05-05 + */ +public class Main { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("zookeeper.connect", "localhost:2181"); + props.put("group.id", "metric-group"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("auto.offset.reset", "latest"); //value 反序列化 + + DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( + "metric", //kafka topic + new SimpleStringSchema(), // String 序列化 + props)).setParallelism(1); + + dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 + + env.execute("Flink add data source"); + } + +} diff --git a/src/main/java/com/thinker/model/Metric.java b/src/main/java/com/thinker/model/Metric.java new file mode 100644 index 0000000..f09ce45 --- /dev/null +++ b/src/main/java/com/thinker/model/Metric.java @@ -0,0 +1,71 @@ +package com.thinker.model; + +import java.util.Map; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote 实体类 + * @since 2020-05-05 + */ +public class Metric { + + + private String name; + private long timestamp; + private Map fields; + private Map tags; + + + public Metric() { + } + + public Metric(String name, long timestamp, Map fields, Map tags) { + this.name = name; + this.timestamp = timestamp; + this.fields = fields; + this.tags = tags; + } + + @Override + public String toString() { + return "Metric{" + + "name='" + name + '\'' + + ", timestamp='" + timestamp + '\'' + + ", fields=" + fields + + ", tags=" + tags + + '}'; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public Map getFields() { + return fields; + } + + public void setFields(Map fields) { + this.fields = fields; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } +} diff --git a/src/main/java/com/thinker/util/KafkaUtils.java b/src/main/java/com/thinker/util/KafkaUtils.java new file mode 100644 index 0000000..5b87997 --- /dev/null +++ b/src/main/java/com/thinker/util/KafkaUtils.java @@ -0,0 +1,61 @@ +package com.thinker.util; + +import com.thinker.model.Metric; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import com.alibaba.fastjson.JSON; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote 往 kafka 中写数据工具类: + * @since 2020-05-05 + */ +public class KafkaUtils { + + private static final String broker_list = "localhost:9092"; + private static final String topic = "metric"; // kafka topic,Flink 程序中需要和这个统一 + + private static void writeToKafka() throws InterruptedException { + Properties props = new Properties(); + props.put("bootstrap.servers", broker_list); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化 + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化 + KafkaProducer producer = new KafkaProducer(props); + + Metric metric = new Metric(); + metric.setTimestamp(System.currentTimeMillis()); + metric.setName("mem"); + Map tags = new HashMap<>(); + Map fields = new HashMap<>(); + + tags.put("cluster", "zhisheng"); + tags.put("host_ip", "101.147.022.106"); + + fields.put("used_percent", 90d); + fields.put("max", 27244873d); + fields.put("used", 17244873d); + fields.put("init", 27244873d); + + metric.setTags(tags); + metric.setFields(fields); + + ProducerRecord record = new ProducerRecord(topic, null, null, JSON.toJSONString(metric)); + producer.send(record); + System.out.println("发送数据: " + JSON.toJSONString(metric)); + + producer.flush(); + } + + public static void main(String[] args) throws InterruptedException { + while (true) { + Thread.sleep(300); + writeToKafka(); + } + } + +}