当前位置:实例文章 » 其他实例» [文章]MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )

MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )

发布人:shili8 发布时间:2025-02-25 21:35 阅读次数:0

**闲聊MQ**

在分布式系统中,消息队列(Message Queue)是一个非常重要的组件,它可以帮助我们解耦各个服务之间的依赖关系,使得系统更加灵活和高效。今天,我们要讨论的是三种比较流行的消息队列系统:Kafka、RocketMQ 和 Pulsar。

### 一、Kafka####1. 简介Apache Kafka 是一个分布式流数据平台,它最初是由 LinkedIn 公司内部使用,后来开源给 Apache 基金会。Kafka 的主要功能是作为一个高吞吐量的消息队列系统,能够处理大量的日志和事件数据。

####2. 架构Kafka 的架构非常简单,主要包括以下几个组件:

* **Broker**: Kafka 中的 Broker 是负责存储和传输消息的节点。每个 Broker 都有一个唯一的 ID。
* **Topic**: Topic 是一个逻辑上的消息队列,它可以被多个 Broker 共享。每个 Topic 有自己的分区(Partition)。
* **Partition**: Partition 是一个物理上的消息存储单元,每个 Partition 可以分布在多个 Broker 上。

####3. 特点Kafka 的一些特点包括:

* **高吞吐量**: Kafka 支持非常高的吞吐量,能够处理数十万条消息每秒。
* **低延迟**: Kafka 能够提供非常低的延迟,因为它使用了零拷贝技术和内存映射文件。
* **可扩展性**: Kafka 的架构设计非常灵活,可以轻松地添加或删除 Broker 来适应系统的需求。

####4. 使用示例下面是一个简单的 Kafka 示例,使用 Python 的 `kafka-python` 库来生产和消费消息:

from kafka import KafkaProducer, KafkaConsumer# 生产者配置producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 消费者配置consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# 生产消息producer.send('my_topic', value=b'Hello, world!')

# 消费消息for message in consumer:
 print(message.value.decode())


### 二、RocketMQ####1. 简介Apache RocketMQ 是一个分布式消息队列系统,最初是由 Alibaba 公司内部使用,后来开源给 Apache 基金会。RocketMQ 的主要功能是作为一个高吞吐量的消息队列系统,能够处理大量的日志和事件数据。

####2. 架构RocketMQ 的架构非常简单,主要包括以下几个组件:

* **NameServer**: RocketMQ 中的 NameServer 是负责维护 Broker 和 Topic信息的节点。
* **Broker**: RocketMQ 中的 Broker 是负责存储和传输消息的节点。每个 Broker 都有一个唯一的 ID。
* **Topic**: Topic 是一个逻辑上的消息队列,它可以被多个 Broker 共享。

####3. 特点RocketMQ 的一些特点包括:

* **高吞吐量**: RocketMQ 支持非常高的吞吐量,能够处理数十万条消息每秒。
* **低延迟**: RocketMQ 能够提供非常低的延迟,因为它使用了零拷贝技术和内存映射文件。
* **可扩展性**: RocketMQ 的架构设计非常灵活,可以轻松地添加或删除 Broker 来适应系统的需求。

####4. 使用示例下面是一个简单的 RocketMQ 示例,使用 Java 的 `rocketmq-client` 库来生产和消费消息:

javaimport org.apache.rocketmq.client.consumer.DefaultConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class RocketMQExample {
 public static void main(String[] args) throws Exception {
 // 生产者配置 DefaultMQProducer producer = new DefaultMQProducer("my_group");
 producer.setNamesrvAddr("localhost:9876");

 // 消费者配置 DefaultConsumer consumer = new DefaultConsumer("my_group", "my_topic");
 consumer.setMessageListener(new MessageListenerConcurrently() {
 @Override public void onMessage(List msgs, ConsumeOrderlyContext context) {
 for (MessageExt message : msgs) {
 System.out.println(message.getBody());
 }
 }
 });

 // 生产消息 producer.start();
 producer.send(new Message("my_topic", "Hello, world!".getBytes()));

 // 消费消息 consumer.start();
 }
}


### 三、Pulsar####1. 简介Apache Pulsar 是一个分布式流数据平台,最初是由 Yahoo! 公司内部使用,后来开源给 Apache 基金会。Pulsar 的主要功能是作为一个高吞吐量的消息队列系统,能够处理大量的日志和事件数据。

####2. 架构Pulsar 的架构非常简单,主要包括以下几个组件:

* **Broker**: Pulsar 中的 Broker 是负责存储和传输消息的节点。每个 Broker 都有一个唯一的 ID。
* **Topic**: Topic 是一个逻辑上的消息队列,它可以被多个 Broker 共享。
* **Partition**: Partition 是一个物理上的消息存储单元,每个 Partition 可以分布在多个 Broker 上。

####3. 特点Pulsar 的一些特点包括:

* **高吞吐量**: Pulsar 支持非常高的吞吐量,能够处理数十万条消息每秒。
* **低延迟**: Pulsar 能够提供非常低的延迟,因为它使用了零拷贝技术和内存映射文件。
* **可扩展性**: Pulsar 的架构设计非常灵活,可以轻松地添加或删除 Broker 来适应系统的需求。

####4. 使用示例下面是一个简单的 Pulsar 示例,使用 Java 的 `pulsar-client` 库来生产和消费消息:

javaimport org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionName;

public class PulsarExample {
 public static void main(String[] args) throws Exception {
 // 生产者配置 PulsarClient client = PulsarClient.builder()
 .serviceHttpUrl(" /> .build();

 Producer producer = client.newProducer()
 .topic("my_topic")
 .createIfNotExists(true)
 .blockIfBusy(false)
 .build();

 // 消费者配置 Consumer consumer = client.newConsumer()
 .topic("my_topic")
 .subscriptionName(SubscriptionName.random())
 .subscriptionType(SubscriptionType.Exclusive)
 .enableBatching(true)
 .batchingMaxMessages(1000)
 .batchingMaxBytes(1024 *1024)
 .build();

 // 生产消息 producer.send("Hello, world!".getBytes());

 // 消费消息 consumer.subscribe();
 }
}


以上就是关于 Kafka、RocketMQ 和 Pulsar 的简单介绍和使用示例。这些消息队列系统都支持高吞吐量、低延迟和可扩展性,能够处理大量的日志和事件数据。

相关标签:rocketmqkafka分布式
其他信息

其他资源

Top