From adf3bab1094e90709e16aec4d3b27b4760d59001 Mon Sep 17 00:00:00 2001 From: zeek <984294471@qq.com> Date: Fri, 15 May 2020 00:47:18 +0800 Subject: [PATCH] =?UTF-8?q?Flink=20=E5=86=99=E5=85=A5=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=88=B0=20Kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + pom.xml | 48 ++++++++++++------- src/main/java/META-INF/MANIFEST.MF | 3 ++ .../com/thinker/kafka/FlinkSinkToKafka.java | 48 +++++++++++++++++++ src/main/java/com/thinker/main/App.java | 2 +- .../java/com/thinker/util/KafkaUtils2.java | 2 +- 6 files changed, 84 insertions(+), 20 deletions(-) create mode 100644 src/main/java/META-INF/MANIFEST.MF create mode 100644 src/main/java/com/thinker/kafka/FlinkSinkToKafka.java diff --git a/README.md b/README.md index 760726a..4a349db 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,4 @@ - [如何自定义 Data Source](https://www.zeekling.cn/articles/2020/05/04/1588559095224.html) - [Data Sink 介绍](https://www.zeekling.cn/articles/2020/05/04/1588666906660.html) - [自定义Data Sink](https://www.zeekling.cn/articles/2020/05/05/1588680092763.html) +- [Flink 写入数据到 Kafka](https://www.zeekling.cn/articles/2020/05/15/1589474545288.html) diff --git a/pom.xml b/pom.xml index 4d8166f..6dbff2e 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,13 @@ provided + + org.scala-lang + scala-library + 2.10.6 + provided + + junit junit @@ -98,10 +105,31 @@ + - + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + jar + jar + target/lib + false + true + + + + + net.alchim31.maven @@ -120,25 +148,9 @@ org.apache.maven.plugins - maven-assembly-plugin - 3.0.0 - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - + maven-jar-plugin - diff --git a/src/main/java/META-INF/MANIFEST.MF b/src/main/java/META-INF/MANIFEST.MF new file mode 100644 index 0000000..20ef52d --- /dev/null +++ b/src/main/java/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: com.thinker.kafka.FlinkSinkToKafka + diff --git a/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java b/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java new file mode 100644 index 0000000..d498051 --- /dev/null +++ b/src/main/java/com/thinker/kafka/FlinkSinkToKafka.java @@ -0,0 +1,48 @@ +package com.thinker.kafka; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.*; + +import java.util.Properties; + +/** + * @author zeekling [lingzhaohui@zeekling.cn] + * @version 1.0 + * @apiNote + * @since 2020-05-14 + */ +public class FlinkSinkToKafka { + + private static final String READ_TOPIC = "student-write"; + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("zookeeper.connect", "localhost:2181"); + props.put("group.id", "student-group"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("auto.offset.reset", "latest"); + DataStreamSource student = env.addSource(new FlinkKafkaConsumer011<>( + READ_TOPIC, //这个 kafka topic 需要和上面的工具类的 topic 一致 + new SimpleStringSchema(), + props)).setParallelism(1); + student.print(); + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "localhost:9092"); + properties.setProperty("zookeeper.connect", "localhost:2181"); + properties.setProperty("group.id", "student-write"); + + student.addSink(new FlinkKafkaProducer011( + "student-write", + new SimpleStringSchema(), + properties + )).name("flink-connectors-kafka").setParallelism(1); + student.print(); + env.execute("flink learning connectors kafka"); + } + +} diff --git a/src/main/java/com/thinker/main/App.java b/src/main/java/com/thinker/main/App.java index 1d9bf46..977d205 100644 --- a/src/main/java/com/thinker/main/App.java +++ b/src/main/java/com/thinker/main/App.java @@ -26,7 +26,7 @@ public class App { } public static void main(String[] args) throws Exception { - System.out.println("H ello World!"); + System.out.println("Hello World!"); if (args.length != 2) { System.err.println("USAGE:\nSocketTextStreamWordCount "); return; diff --git a/src/main/java/com/thinker/util/KafkaUtils2.java b/src/main/java/com/thinker/util/KafkaUtils2.java index debc68e..1518551 100644 --- a/src/main/java/com/thinker/util/KafkaUtils2.java +++ b/src/main/java/com/thinker/util/KafkaUtils2.java @@ -17,7 +17,7 @@ public class KafkaUtils2 { private static final String broker_list = "localhost:9092"; - private static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topic + private static final String topic = "student-write"; //kafka topic 需要和 flink 程序用同一个 topic private static void writeToKafka() throws InterruptedException { Properties props = new Properties();