增加自定义source

This commit is contained in:
zeek 2020-05-05 14:54:58 +08:00
parent 01c707d62e
commit 9006e16095
6 changed files with 198 additions and 2 deletions

View File

@ -0,0 +1,19 @@
DROP TABLE IF EXISTS `Student`;
CREATE TABLE `Student` (
`id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`name` VARCHAR(25) COLLATE utf8_bin DEFAULT NULL,
`password` VARCHAR(25) COLLATE utf8_bin DEFAULT NULL,
`age` INT(10) DEFAULT NULL,
PRIMARY KEY (`id`)
)
ENGINE = InnoDB
AUTO_INCREMENT = 5
DEFAULT CHARSET = utf8
COLLATE = utf8_bin;
INSERT INTO `Student`
VALUES ('1', 'zhisheng01', '123456', '18'),
('2', 'zhisheng02', '123', '17'),
('3', 'zhisheng03', '1234', '18'),
('4', 'zhisheng04', '12345', '16');
COMMIT;

View File

@ -1,4 +1,4 @@
package com.thinker;
package com.thinker.main;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

View File

@ -0,0 +1,22 @@
package com.thinker.main;
import com.thinker.sql.SourceFromMySQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-05
*/
public class FlinkCustomSource {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFromMySQL()).print();
env.execute("Flink add data sourc");
}
}

View File

@ -1,4 +1,4 @@
package com.thinker;
package com.thinker.main;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

View File

@ -0,0 +1,67 @@
package com.thinker.model;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote student 表的实体信息
* @since 2020-05-05
*/
public class Student {
private int id;
private String name;
private String password;
private int age;
public Student() {
}
public Student(int id, String name, String password, int age) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}

View File

@ -0,0 +1,88 @@
package com.thinker.sql;
import com.thinker.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-05
*/
public class SourceFromMySQL extends RichSourceFunction<Student> {
private PreparedStatement ps;
private Connection connection;
/**
* open() 方法中建立连接这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "select * from Student;";
ps = this.connection.prepareStatement(sql);
}
/**
* 程序执行完毕就可以进行关闭连接和释放资源的动作了
*
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
if (connection != null) { //关闭连接和释放资源
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* DataStream 调用一次 run() 方法用来获取数据
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Student> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Student student = new Student(
resultSet.getInt("id"),
resultSet.getString("name").trim(),
resultSet.getString("password").trim(),
resultSet.getInt("age"));
ctx.collect(student);
}
}
@Override
public void cancel() {
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
}
return con;
}
}