当前位置:实例文章 » C#开发实例» [文章]Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

发布人:shili8 发布时间:2024-11-03 21:17 阅读次数:0

**Kafka 入门与 Offset 提交**

Apache Kafka 是一个分布式流处理平台,它可以实时处理大量数据。Kafka 的核心概念是主题(Topic)、分区(Partition)和偏移量(Offset)。在本文中,我们将介绍 Kafka 的基本概念,手动提交 Offset、同步提交和异步提交,以及指定 Offset 消费。

###1. Kafka 基础概念####1.1 主题(Topic)

主题是 Kafka 中的一个关键概念,它代表了一个数据流。每个主题可以分成多个分区,每个分区是一个独立的数据流。

####1.2 分区(Partition)

分区是主题中的一部分,它代表了一个独立的数据流。每个分区有自己的偏移量,用于跟踪消费者已经处理过的消息。

####1.3 偏移量(Offset)

偏移量是 Kafka 中的一个关键概念,它代表了消费者已经处理过的消息的位置。每个分区都有自己的偏移量,每次消费者处理一条消息时,偏移量都会增加。

###2. 手动提交 Offset手动提交 Offset 是一种在 Kafka 中直接控制 Offset 的方式。在这种模式下,消费者需要手动将 Offset 提交到 Kafka 中,以便 Kafka 知道消费者已经处理过的消息。

####2.1 手动提交 Offset 的示例代码

javaimport org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class ManualOffsetCommitExample {
 public static void main(String[] args) {
 // Kafka 配置 Properties props = new Properties();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

 // 消费者配置 KafkaConsumer consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("my-topic"));

 while (true) {
 ConsumerRecords records = consumer.poll(100);

 for (ConsumerRecord record : records) {
 System.out.println(record.value());

 // 手动提交 Offset consumer.commitSync();
 }
 }
 }
}

###3. 同步提交同步提交是 Kafka 中的一种 Offset 提交方式。在这种模式下,消费者会等待 Offset 被提交到 Kafka 中,然后才继续处理下一条消息。

####3.1 同步提交的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class SyncOffsetCommitExample {
 public static void main(String[] args) {
 // Kafka 配置 Properties props = new Properties();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

 // 消费者配置 KafkaConsumer consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("my-topic"));

 while (true) {
 ConsumerRecords records = consumer.poll(100);

 for (ConsumerRecord record : records) {
 System.out.println(record.value());

 // 同步提交 Offset consumer.commitSync();
 }
 }
 }
}

###4. 异步提交异步提交是 Kafka 中的一种 Offset 提交方式。在这种模式下,消费者会将 Offset 提交到 Kafka 中,然后继续处理下一条消息。

####4.1 异步提交的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class AsyncOffsetCommitExample {
 public static void main(String[] args) {
 // Kafka 配置 Properties props = new Properties();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

 // 消费者配置 KafkaConsumer consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("my-topic"));

 while (true) {
 ConsumerRecords records = consumer.poll(100);

 for (ConsumerRecord record : records) {
 System.out.println(record.value());

 // 异步提交 Offset consumer.commitAsync();
 }
 }
 }
}

###5. 指定 Offset 消费指定 Offset 消费是 Kafka 中的一种特定方式,消费者会从指定的 Offset 开始消费消息。

####5.1 指定 Offset 消费的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class SpecifyOffsetExample {
 public static void main(String[] args) {
 // Kafka 配置 Properties props = new Properties();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

 // 消费者配置 KafkaConsumer consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("my-topic"));

 // 指定 Offset 开始消费 long offset =100;
 consumer.seek(offset);

 while (true) {
 ConsumerRecords records = consumer.poll(100);

 for (ConsumerRecord record : records) {
 System.out.println(record.value());
 }
 }
 }
}

本文介绍了 Kafka 的基本概念,手动提交 Offset、同步提交和异步提交,以及指定 Offset 消费。通过这些示例代码,你可以更好地理解 Kafka 的使用方式,并在实际项目中应用它们。

相关标签:c#linqkafka分布式
其他信息

其他资源

Top