当前位置:实例文章 » C#开发实例» [文章]一篇文章带你从入门都入土 Kafka 消息中间件(原理+代码)

一篇文章带你从入门都入土 Kafka 消息中间件(原理+代码)

发布人:shili8 发布时间:2024-05-25 08:18 阅读次数:0

Kafka 是一个分布式的消息中间件系统,由 LinkedIn 公司开发并开源。它具有高吞吐量、低延迟、高可靠性等特点,被广泛应用于大数据领域。本文将带你从入门到入土 Kafka 消息中间件,介绍其原理和代码示例。

### Kafka 原理介绍Kafka 的核心概念包括 Producer(生产者)、Consumer(消费者)、Broker(代理服务器)和 Topic(主题)。Producer 负责向 Kafka 集群发送消息,Consumer 负责从 Kafka 集群消费消息,Broker 是 Kafka 集群中的服务器节点,Topic 是消息的分类。

Kafka 的消息存储是基于日志的,每个 Topic 包含多个 Partition(分区),每个 Partition 又包含多个 Segment(段)。Producer 发送的消息被追加到 Partition 的当前 Segment 中,Consumer从 Partition 的当前 Offset(偏移量)开始消费消息。

Kafka 的消息传递是基于发布-订阅模式的,Producer 发送消息到 Topic,Consumer 订阅 Topic 并消费消息。Kafka 支持多个 Consumer Group(消费者组),每个 Consumer Group 中的 Consumer 可以并行消费消息。

### Kafka 环境搭建首先,你需要下载 Kafka 并解压到本地。然后,进入 Kafka 目录,启动 Zookeeper 和 Kafka服务器:

bashbin/zookeeper-server-start.sh config/zookeeper.propertiesbin/kafka-server-start.sh config/server.properties


接着,创建一个 Topic:

bashbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor1 --partitions1 --topic test


### Kafka 生产者示例下面是一个简单的 Kafka 生产者示例,使用 Java语言编写:

javaimport org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class SimpleProducer {
 public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 KafkaProducer producer = new KafkaProducer<>(props);

 ProducerRecord record = new ProducerRecord<>("test", "key", "Hello, Kafka!");

 producer.send(record);

 producer.close();
 }
}


### Kafka 消费者示例下面是一个简单的 Kafka 消费者示例,使用 Java语言编写:

javaimport org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
 public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test-group");
 props.put("key.deserializer", StringDeserializer.class.getName());
 props.put("value.deserializer", StringDeserializer.class.getName());

 KafkaConsumer consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Collections.singletonList("test"));

 while (true) {
 ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
 records.forEach(record -> {
 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 });
 }
 }
}


### 总结通过本文的介绍,你应该对 Kafka 消息中间件有了初步的了解。Kafka 的原理和使用方法并不复杂,只要掌握了核心概念和基本操作,就可以开始在实际项目中应用 Kafka了。希望本文能够帮助你快速入门 Kafka,并在实践中不断深入学习和应用。

相关标签:c#linqkafka分布式
其他信息

其他资源

Top