diff --git a/pom.xml b/pom.xml
index 6dbff2e..277e7e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,8 +16,7 @@
UTF-8
1.8
1.8
- 1.10.0
- 2.11
+ 1.12.0
@@ -29,15 +28,23 @@
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ flink-streaming-java_2.12
${flink.version}
+
+
org.apache.flink
- flink-clients_${scala.binary.version}
+ flink-clients_2.12
${flink.version}
+
+ org.scala-lang
+ scala-library
+ 2.12.0
+
+
org.slf4j
@@ -54,7 +61,7 @@
org.apache.flink
- flink-connector-kafka-0.11_${scala.binary.version}
+ flink-connector-kafka_2.12
${flink.version}
@@ -93,7 +100,7 @@
org.scala-lang
scala-library
- 2.10.6
+ 2.12.10
provided
diff --git a/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java b/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java
index d498051..be05917 100644
--- a/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java
+++ b/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java
@@ -26,7 +26,7 @@ public class FlinkSinkToKafka {
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
- DataStreamSource student = env.addSource(new FlinkKafkaConsumer011<>(
+ DataStreamSource student = env.addSource(new FlinkKafkaConsumer<>(
READ_TOPIC, //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1);
@@ -36,7 +36,7 @@ public class FlinkSinkToKafka {
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "student-write");
- student.addSink(new FlinkKafkaProducer011(
+ student.addSink(new FlinkKafkaProducer(
"student-write",
new SimpleStringSchema(),
properties
diff --git a/src/main/java/com/thinker/main/FlinkCustomSource.java b/src/main/java/com/thinker/main/FlinkCustomSource.java
index 8965e2b..32c2e95 100644
--- a/src/main/java/com/thinker/main/FlinkCustomSource.java
+++ b/src/main/java/com/thinker/main/FlinkCustomSource.java
@@ -16,7 +16,7 @@ public class FlinkCustomSource {
env.addSource(new SourceFromMySQL()).print();
- env.execute("Flink add data sourc");
+ env.execute("Flink add data source");
}
}
diff --git a/src/main/java/com/thinker/main/Main.java b/src/main/java/com/thinker/main/Main.java
index 48d9901..cb61b54 100644
--- a/src/main/java/com/thinker/main/Main.java
+++ b/src/main/java/com/thinker/main/Main.java
@@ -3,7 +3,7 @@ package com.thinker.main;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
@@ -26,7 +26,7 @@ public class Main {
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest"); //value 反序列化
- DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
+ DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer<>(
"metric", //kafka topic
new SimpleStringSchema(), // String 序列化
props)).setParallelism(1);
diff --git a/src/main/java/com/thinker/main/SinkToMysql.java b/src/main/java/com/thinker/main/SinkToMysql.java
index 39a8fbd..5675e4d 100644
--- a/src/main/java/com/thinker/main/SinkToMysql.java
+++ b/src/main/java/com/thinker/main/SinkToMysql.java
@@ -6,7 +6,7 @@ import com.thinker.sql.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
@@ -29,7 +29,7 @@ public class SinkToMysql {
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
- SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>(
+ SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer<>(
"student", //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1)
diff --git a/src/main/java/com/thinker/model/Student.java b/src/main/java/com/thinker/model/Student.java
index c66a4be..46ced62 100644
--- a/src/main/java/com/thinker/model/Student.java
+++ b/src/main/java/com/thinker/model/Student.java
@@ -1,7 +1,5 @@
package com.thinker.model;
-import jdk.nashorn.internal.objects.annotations.Getter;
-import jdk.nashorn.internal.objects.annotations.Setter;
/**
* @author zeekling [lingzhaohui@zeekling.cn]