From 96bf3cdaa58b283fe1aba72d43160098daafd2f7 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 10 Oct 2021 23:10:13 +0800 Subject: [PATCH] update flink to 2.12 --- pom.xml | 19 +++++++++++++------ .../com/thinker/kafka/FlinkSinkToKafka.java | 4 ++-- .../com/thinker/main/FlinkCustomSource.java | 2 +- src/main/java/com/thinker/main/Main.java | 4 ++-- .../java/com/thinker/main/SinkToMysql.java | 4 ++-- src/main/java/com/thinker/model/Student.java | 2 -- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index 6dbff2e..277e7e7 100644 --- a/pom.xml +++ b/pom.xml @@ -16,8 +16,7 @@ UTF-8 1.8 1.8 - 1.10.0 - 2.11 + 1.12.0 @@ -29,15 +28,23 @@ org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java_2.12 ${flink.version} + + org.apache.flink - flink-clients_${scala.binary.version} + flink-clients_2.12 ${flink.version} + + org.scala-lang + scala-library + 2.12.0 + + org.slf4j @@ -54,7 +61,7 @@ org.apache.flink - flink-connector-kafka-0.11_${scala.binary.version} + flink-connector-kafka_2.12 ${flink.version} @@ -93,7 +100,7 @@ org.scala-lang scala-library - 2.10.6 + 2.12.10 provided diff --git a/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java b/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java index d498051..be05917 100644 --- a/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java +++ b/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java @@ -26,7 +26,7 @@ public class FlinkSinkToKafka { props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); - DataStreamSource student = env.addSource(new FlinkKafkaConsumer011<>( + DataStreamSource student = env.addSource(new FlinkKafkaConsumer<>( READ_TOPIC, //这个 kafka topic 需要和上面的工具类的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1); @@ -36,7 +36,7 @@ public class FlinkSinkToKafka { properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "student-write"); - student.addSink(new FlinkKafkaProducer011( + student.addSink(new FlinkKafkaProducer( "student-write", new SimpleStringSchema(), properties diff --git a/src/main/java/com/thinker/main/FlinkCustomSource.java b/src/main/java/com/thinker/main/FlinkCustomSource.java index 8965e2b..32c2e95 100644 --- a/src/main/java/com/thinker/main/FlinkCustomSource.java +++ b/src/main/java/com/thinker/main/FlinkCustomSource.java @@ -16,7 +16,7 @@ public class FlinkCustomSource { env.addSource(new SourceFromMySQL()).print(); - env.execute("Flink add data sourc"); + env.execute("Flink add data source"); } } diff --git a/src/main/java/com/thinker/main/Main.java b/src/main/java/com/thinker/main/Main.java index 48d9901..cb61b54 100644 --- a/src/main/java/com/thinker/main/Main.java +++ b/src/main/java/com/thinker/main/Main.java @@ -3,7 +3,7 @@ package com.thinker.main; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; @@ -26,7 +26,7 @@ public class Main { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); //value 反序列化 - DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( + DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer<>( "metric", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); diff --git a/src/main/java/com/thinker/main/SinkToMysql.java b/src/main/java/com/thinker/main/SinkToMysql.java index 39a8fbd..5675e4d 100644 --- a/src/main/java/com/thinker/main/SinkToMysql.java +++ b/src/main/java/com/thinker/main/SinkToMysql.java @@ -6,7 +6,7 @@ import com.thinker.sql.SinkToMySQL; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; @@ -29,7 +29,7 @@ public class SinkToMysql { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); - SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>( + SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer<>( "student", //这个 kafka topic 需要和上面的工具类的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1) diff --git a/src/main/java/com/thinker/model/Student.java b/src/main/java/com/thinker/model/Student.java index c66a4be..46ced62 100644 --- a/src/main/java/com/thinker/model/Student.java +++ b/src/main/java/com/thinker/model/Student.java @@ -1,7 +1,5 @@ package com.thinker.model; -import jdk.nashorn.internal.objects.annotations.Getter; -import jdk.nashorn.internal.objects.annotations.Setter; /** * @author zeekling [lingzhaohui@zeekling.cn]