增加sinktomysql

This commit is contained in:
zeek 2020-05-05 20:45:45 +08:00
parent 48eda8f46d
commit 384306a1d2
5 changed files with 173 additions and 0 deletions

View File

@ -82,6 +82,14 @@
<version>5.1.34</version> <version>5.1.34</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -0,0 +1,43 @@
package com.thinker.main;
import com.thinker.model.Student;
import com.alibaba.fastjson.JSON;
import com.thinker.sql.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-05
*/
public class SinkToMysql {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
"student", //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1)
.map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
student.addSink(new SinkToMySQL()); //数据 sink mysql
env.execute("Flink add sink");
}
}

View File

@ -1,5 +1,8 @@
package com.thinker.model; package com.thinker.model;
import jdk.nashorn.internal.objects.annotations.Getter;
import jdk.nashorn.internal.objects.annotations.Setter;
/** /**
* @author zeekling [lingzhaohui@zeekling.cn] * @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0 * @version 1.0

View File

@ -0,0 +1,77 @@
package com.thinker.sql;
import com.thinker.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-05
*/
public class SinkToMySQL extends RichSinkFunction<Student> {
private PreparedStatement ps;
private Connection connection;
/**
* open() 方法中建立连接这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Student value, Context context) throws Exception {
//组装数据执行插入操作
ps.setInt(1, value.getId());
ps.setString(2, value.getName());
ps.setString(3, value.getPassword());
ps.setInt(4, value.getAge());
ps.executeUpdate();
System.out.println("sink to mysql");
}
private static Connection getConnection() {
Connection con = null;
try {
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
}
return con;
}
}

View File

@ -0,0 +1,42 @@
package com.thinker.util;
import com.thinker.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.alibaba.fastjson.JSON;
import java.util.Properties;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-05
*/
public class KafkaUtils2 {
private static final String broker_list = "localhost:9092";
private static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topic
private static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
for (int i = 1; i <= 200; i++) {
Student student = new Student(i, "baiyu" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
producer.send(record);
System.out.println("发送数据: " + JSON.toJSONString(student));
}
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
}