增加pom
This commit is contained in:
parent
f60481d7c5
commit
01c707d62e
49
pom.xml
49
pom.xml
@ -14,8 +14,10 @@
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<maven.compiler.source>1.7</maven.compiler.source>
|
||||
<maven.compiler.target>1.7</maven.compiler.target>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
<flink.version>1.10.0</flink.version>
|
||||
<scala.binary.version>2.11</scala.binary.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -23,17 +25,37 @@
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-java</artifactId>
|
||||
<version>1.10.0</version>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_2.11</artifactId>
|
||||
<version>1.10.0</version>
|
||||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients_2.11</artifactId>
|
||||
<version>1.10.0</version>
|
||||
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!--日志-->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.7.7</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<version>1.2.17</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<!--flink kafka connector-->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
@ -47,6 +69,19 @@
|
||||
<version>2.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<!--alibaba fastjson-->
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.51</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>5.1.34</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
39
src/main/java/com/thinker/Main.java
Normal file
39
src/main/java/com/thinker/Main.java
Normal file
@ -0,0 +1,39 @@
|
||||
package com.thinker;
|
||||
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author zeekling [lingzhaohui@zeekling.cn]
|
||||
* @version 1.0
|
||||
* @apiNote
|
||||
* @since 2020-05-05
|
||||
*/
|
||||
public class Main {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("zookeeper.connect", "localhost:2181");
|
||||
props.put("group.id", "metric-group");
|
||||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
|
||||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("auto.offset.reset", "latest"); //value 反序列化
|
||||
|
||||
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
|
||||
"metric", //kafka topic
|
||||
new SimpleStringSchema(), // String 序列化
|
||||
props)).setParallelism(1);
|
||||
|
||||
dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台
|
||||
|
||||
env.execute("Flink add data source");
|
||||
}
|
||||
|
||||
}
|
71
src/main/java/com/thinker/model/Metric.java
Normal file
71
src/main/java/com/thinker/model/Metric.java
Normal file
@ -0,0 +1,71 @@
|
||||
package com.thinker.model;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author zeekling [lingzhaohui@zeekling.cn]
|
||||
* @version 1.0
|
||||
* @apiNote 实体类
|
||||
* @since 2020-05-05
|
||||
*/
|
||||
public class Metric {
|
||||
|
||||
|
||||
private String name;
|
||||
private long timestamp;
|
||||
private Map<String, Object> fields;
|
||||
private Map<String, String> tags;
|
||||
|
||||
|
||||
public Metric() {
|
||||
}
|
||||
|
||||
public Metric(String name, long timestamp, Map<String, Object> fields, Map<String, String> tags) {
|
||||
this.name = name;
|
||||
this.timestamp = timestamp;
|
||||
this.fields = fields;
|
||||
this.tags = tags;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Metric{" +
|
||||
"name='" + name + '\'' +
|
||||
", timestamp='" + timestamp + '\'' +
|
||||
", fields=" + fields +
|
||||
", tags=" + tags +
|
||||
'}';
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public Map<String, Object> getFields() {
|
||||
return fields;
|
||||
}
|
||||
|
||||
public void setFields(Map<String, Object> fields) {
|
||||
this.fields = fields;
|
||||
}
|
||||
|
||||
public Map<String, String> getTags() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
public void setTags(Map<String, String> tags) {
|
||||
this.tags = tags;
|
||||
}
|
||||
}
|
61
src/main/java/com/thinker/util/KafkaUtils.java
Normal file
61
src/main/java/com/thinker/util/KafkaUtils.java
Normal file
@ -0,0 +1,61 @@
|
||||
package com.thinker.util;
|
||||
|
||||
import com.thinker.model.Metric;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author zeekling [lingzhaohui@zeekling.cn]
|
||||
* @version 1.0
|
||||
* @apiNote 往 kafka 中写数据工具类:
|
||||
* @since 2020-05-05
|
||||
*/
|
||||
public class KafkaUtils {
|
||||
|
||||
private static final String broker_list = "localhost:9092";
|
||||
private static final String topic = "metric"; // kafka topic,Flink 程序中需要和这个统一
|
||||
|
||||
private static void writeToKafka() throws InterruptedException {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", broker_list);
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
|
||||
KafkaProducer producer = new KafkaProducer<String, String>(props);
|
||||
|
||||
Metric metric = new Metric();
|
||||
metric.setTimestamp(System.currentTimeMillis());
|
||||
metric.setName("mem");
|
||||
Map<String, String> tags = new HashMap<>();
|
||||
Map<String, Object> fields = new HashMap<>();
|
||||
|
||||
tags.put("cluster", "zhisheng");
|
||||
tags.put("host_ip", "101.147.022.106");
|
||||
|
||||
fields.put("used_percent", 90d);
|
||||
fields.put("max", 27244873d);
|
||||
fields.put("used", 17244873d);
|
||||
fields.put("init", 27244873d);
|
||||
|
||||
metric.setTags(tags);
|
||||
metric.setFields(fields);
|
||||
|
||||
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(metric));
|
||||
producer.send(record);
|
||||
System.out.println("发送数据: " + JSON.toJSONString(metric));
|
||||
|
||||
producer.flush();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
while (true) {
|
||||
Thread.sleep(300);
|
||||
writeToKafka();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user