Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)
发布人:shili8
发布时间:2024-11-03 21:17
阅读次数:0
**Kafka 入门与 Offset 提交**
Apache Kafka 是一个分布式流处理平台,它可以实时处理大量数据。Kafka 的核心概念是主题(Topic)、分区(Partition)和偏移量(Offset)。在本文中,我们将介绍 Kafka 的基本概念,手动提交 Offset、同步提交和异步提交,以及指定 Offset 消费。
###1. Kafka 基础概念####1.1 主题(Topic)
主题是 Kafka 中的一个关键概念,它代表了一个数据流。每个主题可以分成多个分区,每个分区是一个独立的数据流。
####1.2 分区(Partition)
分区是主题中的一部分,它代表了一个独立的数据流。每个分区有自己的偏移量,用于跟踪消费者已经处理过的消息。
####1.3 偏移量(Offset)
偏移量是 Kafka 中的一个关键概念,它代表了消费者已经处理过的消息的位置。每个分区都有自己的偏移量,每次消费者处理一条消息时,偏移量都会增加。
###2. 手动提交 Offset手动提交 Offset 是一种在 Kafka 中直接控制 Offset 的方式。在这种模式下,消费者需要手动将 Offset 提交到 Kafka 中,以便 Kafka 知道消费者已经处理过的消息。
####2.1 手动提交 Offset 的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class ManualOffsetCommitExample { public static void main(String[] args) { // Kafka 配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费者配置 KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.value()); // 手动提交 Offset consumer.commitSync(); } } } }
###3. 同步提交同步提交是 Kafka 中的一种 Offset 提交方式。在这种模式下,消费者会等待 Offset 被提交到 Kafka 中,然后才继续处理下一条消息。
####3.1 同步提交的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class SyncOffsetCommitExample { public static void main(String[] args) { // Kafka 配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费者配置 KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.value()); // 同步提交 Offset consumer.commitSync(); } } } }
###4. 异步提交异步提交是 Kafka 中的一种 Offset 提交方式。在这种模式下,消费者会将 Offset 提交到 Kafka 中,然后继续处理下一条消息。
####4.1 异步提交的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class AsyncOffsetCommitExample { public static void main(String[] args) { // Kafka 配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费者配置 KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.value()); // 异步提交 Offset consumer.commitAsync(); } } } }
###5. 指定 Offset 消费指定 Offset 消费是 Kafka 中的一种特定方式,消费者会从指定的 Offset 开始消费消息。
####5.1 指定 Offset 消费的示例代码
javaimport org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class SpecifyOffsetExample { public static void main(String[] args) { // Kafka 配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费者配置 KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); // 指定 Offset 开始消费 long offset =100; consumer.seek(offset); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.value()); } } } }
本文介绍了 Kafka 的基本概念,手动提交 Offset、同步提交和异步提交,以及指定 Offset 消费。通过这些示例代码,你可以更好地理解 Kafka 的使用方式,并在实际项目中应用它们。