38 lines
1.4 KiB
Java
38 lines
1.4 KiB
Java
package com.thinker.kafka.consumer;
|
||
|
||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||
|
||
import java.util.Collections;
|
||
import java.util.Properties;
|
||
|
||
/**
|
||
* @author zeekling [lingzhaohui@zeekling.cn]
|
||
* @version 1.0
|
||
* @apiNote
|
||
* @since 2020-05-17
|
||
*/
|
||
public class CustomConsumer {
|
||
|
||
public static void main(String[] args) {
|
||
Properties props = new Properties();
|
||
props.put("bootstrap.servers", "localhost:9092");
|
||
props.put("group.id", "test");//消费者组,只要group.id相同,就属于同一个消费者组
|
||
props.put("enable.auto.commit", "false");//自动提交offset
|
||
// props.put("auto.commit.interval.ms", "1000");
|
||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||
consumer.subscribe(Collections.singletonList("first"));
|
||
while (true) {
|
||
ConsumerRecords<String, String> records = consumer.poll(100);
|
||
for (ConsumerRecord<String, String> record : records) {
|
||
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
|
||
}
|
||
consumer.commitSync();
|
||
}
|
||
}
|
||
|
||
}
|