Flink 写入数据到 Kafka

This commit is contained in:
zeek 2020-05-15 00:47:18 +08:00
parent 867013f740
commit adf3bab109
6 changed files with 84 additions and 20 deletions

View File

@ -6,3 +6,4 @@
- [如何自定义 Data Source](https://www.zeekling.cn/articles/2020/05/04/1588559095224.html)
- [Data Sink 介绍](https://www.zeekling.cn/articles/2020/05/04/1588666906660.html)
- [自定义Data Sink](https://www.zeekling.cn/articles/2020/05/05/1588680092763.html)
- [Flink 写入数据到 Kafka](https://www.zeekling.cn/articles/2020/05/15/1589474545288.html)

48
pom.xml
View File

@ -90,6 +90,13 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@ -98,10 +105,31 @@
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<type>jar</type>
<includeTypes>jar</includeTypes>
<outputDirectory>target/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
</configuration>
</execution>
</executions>
</plugin>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
@ -120,25 +148,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: com.thinker.kafka.FlinkSinkToKafka

View File

@ -0,0 +1,48 @@
package com.thinker.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.*;
import java.util.Properties;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-14
*/
public class FlinkSinkToKafka {
private static final String READ_TOPIC = "student-write";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "student-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
READ_TOPIC, //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1);
student.print();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "student-write");
student.addSink(new FlinkKafkaProducer011<String>(
"student-write",
new SimpleStringSchema(),
properties
)).name("flink-connectors-kafka").setParallelism(1);
student.print();
env.execute("flink learning connectors kafka");
}
}

View File

@ -26,7 +26,7 @@ public class App {
}
public static void main(String[] args) throws Exception {
System.out.println("H ello World!");
System.out.println("Hello World!");
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
return;

View File

@ -17,7 +17,7 @@ public class KafkaUtils2 {
private static final String broker_list = "localhost:9092";
private static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topic
private static final String topic = "student-write"; //kafka topic 需要和 flink 程序用同一个 topic
private static void writeToKafka() throws InterruptedException {
Properties props = new Properties();