diff --git a/sql/custom_data_source.sql b/sql/custom_data_source.sql new file mode 100644 index 0000000..00ce685 --- /dev/null +++ b/sql/custom_data_source.sql @@ -0,0 +1,19 @@ +DROP TABLE IF EXISTS `Student`; +CREATE TABLE `Student` ( + `id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, + `name` VARCHAR(25) COLLATE utf8_bin DEFAULT NULL, + `password` VARCHAR(25) COLLATE utf8_bin DEFAULT NULL, + `age` INT(10) DEFAULT NULL, + PRIMARY KEY (`id`) +) + ENGINE = InnoDB + AUTO_INCREMENT = 5 + DEFAULT CHARSET = utf8 + COLLATE = utf8_bin; + +INSERT INTO `Student` +VALUES ('1', 'zhisheng01', '123456', '18'), + ('2', 'zhisheng02', '123', '17'), + ('3', 'zhisheng03', '1234', '18'), + ('4', 'zhisheng04', '12345', '16'); +COMMIT; diff --git a/src/main/java/com/thinker/App.java b/src/main/java/com/thinker/main/App.java similarity index 98% rename from src/main/java/com/thinker/App.java rename to src/main/java/com/thinker/main/App.java index e469bfc..1d9bf46 100644 --- a/src/main/java/com/thinker/App.java +++ b/src/main/java/com/thinker/main/App.java @@ -1,4 +1,4 @@ -package com.thinker; +package com.thinker.main; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; diff --git a/src/main/java/com/thinker/main/FlinkCustomSource.java b/src/main/java/com/thinker/main/FlinkCustomSource.java new file mode 100644 index 0000000..8965e2b --- /dev/null +++ b/src/main/java/com/thinker/main/FlinkCustomSource.java @@ -0,0 +1,22 @@ +package com.thinker.main; + +import com.thinker.sql.SourceFromMySQL; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote + * @since 2020-05-05 + */ +public class FlinkCustomSource { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.addSource(new SourceFromMySQL()).print(); + + env.execute("Flink add data sourc"); + } + +} diff --git a/src/main/java/com/thinker/Main.java b/src/main/java/com/thinker/main/Main.java similarity index 98% rename from src/main/java/com/thinker/Main.java rename to src/main/java/com/thinker/main/Main.java index 146189c..48d9901 100644 --- a/src/main/java/com/thinker/Main.java +++ b/src/main/java/com/thinker/main/Main.java @@ -1,4 +1,4 @@ -package com.thinker; +package com.thinker.main; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; diff --git a/src/main/java/com/thinker/model/Student.java b/src/main/java/com/thinker/model/Student.java new file mode 100644 index 0000000..394e383 --- /dev/null +++ b/src/main/java/com/thinker/model/Student.java @@ -0,0 +1,67 @@ +package com.thinker.model; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote student 表的实体信息 + * @since 2020-05-05 + */ +public class Student { + + private int id; + private String name; + private String password; + private int age; + + public Student() { + } + + public Student(int id, String name, String password, int age) { + this.id = id; + this.name = name; + this.password = password; + this.age = age; + } + + @Override + public String toString() { + return "Student{" + + "id=" + id + + ", name='" + name + '\'' + + ", password='" + password + '\'' + + ", age=" + age + + '}'; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } +} diff --git a/src/main/java/com/thinker/sql/SourceFromMySQL.java b/src/main/java/com/thinker/sql/SourceFromMySQL.java new file mode 100644 index 0000000..0ac5acf --- /dev/null +++ b/src/main/java/com/thinker/sql/SourceFromMySQL.java @@ -0,0 +1,88 @@ +package com.thinker.sql; + +import com.thinker.model.Student; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote + * @since 2020-05-05 + */ +public class SourceFromMySQL extends RichSourceFunction { + + + private PreparedStatement ps; + private Connection connection; + + /** + * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。 + * + * @param parameters + * @throws Exception + */ + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + connection = getConnection(); + String sql = "select * from Student;"; + ps = this.connection.prepareStatement(sql); + } + + /** + * 程序执行完毕就可以进行,关闭连接和释放资源的动作了 + * + * @throws Exception + */ + @Override + public void close() throws Exception { + super.close(); + if (connection != null) { //关闭连接和释放资源 + connection.close(); + } + if (ps != null) { + ps.close(); + } + } + + /** + * DataStream 调用一次 run() 方法用来获取数据 + * + * @param ctx + * @throws Exception + */ + @Override + public void run(SourceContext ctx) throws Exception { + ResultSet resultSet = ps.executeQuery(); + while (resultSet.next()) { + Student student = new Student( + resultSet.getInt("id"), + resultSet.getString("name").trim(), + resultSet.getString("password").trim(), + resultSet.getInt("age")); + ctx.collect(student); + } + } + + @Override + public void cancel() { + } + + private static Connection getConnection() { + Connection con = null; + try { + Class.forName("com.mysql.jdbc.Driver"); + con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); + } catch (Exception e) { + System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage()); + } + return con; + } +}