一篇文章带你从入门都入土 Kafka 消息中间件(原理+代码)
发布人:shili8
发布时间:2024-05-25 08:18
阅读次数:0
Kafka 是一个分布式的消息中间件系统,由 LinkedIn 公司开发并开源。它具有高吞吐量、低延迟、高可靠性等特点,被广泛应用于大数据领域。本文将带你从入门到入土 Kafka 消息中间件,介绍其原理和代码示例。
### Kafka 原理介绍Kafka 的核心概念包括 Producer(生产者)、Consumer(消费者)、Broker(代理服务器)和 Topic(主题)。Producer 负责向 Kafka 集群发送消息,Consumer 负责从 Kafka 集群消费消息,Broker 是 Kafka 集群中的服务器节点,Topic 是消息的分类。
Kafka 的消息存储是基于日志的,每个 Topic 包含多个 Partition(分区),每个 Partition 又包含多个 Segment(段)。Producer 发送的消息被追加到 Partition 的当前 Segment 中,Consumer从 Partition 的当前 Offset(偏移量)开始消费消息。
Kafka 的消息传递是基于发布-订阅模式的,Producer 发送消息到 Topic,Consumer 订阅 Topic 并消费消息。Kafka 支持多个 Consumer Group(消费者组),每个 Consumer Group 中的 Consumer 可以并行消费消息。
### Kafka 环境搭建首先,你需要下载 Kafka 并解压到本地。然后,进入 Kafka 目录,启动 Zookeeper 和 Kafka服务器:
bashbin/zookeeper-server-start.sh config/zookeeper.propertiesbin/kafka-server-start.sh config/server.properties
接着,创建一个 Topic:
bashbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor1 --partitions1 --topic test
### Kafka 生产者示例下面是一个简单的 Kafka 生产者示例,使用 Java语言编写:
javaimport org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducerproducer = new KafkaProducer<>(props); ProducerRecord record = new ProducerRecord<>("test", "key", "Hello, Kafka!"); producer.send(record); producer.close(); } }
### Kafka 消费者示例下面是一个简单的 Kafka 消费者示例,使用 Java语言编写:
javaimport org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }); } } }
### 总结通过本文的介绍,你应该对 Kafka 消息中间件有了初步的了解。Kafka 的原理和使用方法并不复杂,只要掌握了核心概念和基本操作,就可以开始在实际项目中应用 Kafka了。希望本文能够帮助你快速入门 Kafka,并在实践中不断深入学习和应用。