MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )
发布人:shili8
发布时间:2025-02-25 21:35
阅读次数:0
**闲聊MQ**
在分布式系统中,消息队列(Message Queue)是一个非常重要的组件,它可以帮助我们解耦各个服务之间的依赖关系,使得系统更加灵活和高效。今天,我们要讨论的是三种比较流行的消息队列系统:Kafka、RocketMQ 和 Pulsar。
### 一、Kafka####1. 简介Apache Kafka 是一个分布式流数据平台,它最初是由 LinkedIn 公司内部使用,后来开源给 Apache 基金会。Kafka 的主要功能是作为一个高吞吐量的消息队列系统,能够处理大量的日志和事件数据。
####2. 架构Kafka 的架构非常简单,主要包括以下几个组件:
* **Broker**: Kafka 中的 Broker 是负责存储和传输消息的节点。每个 Broker 都有一个唯一的 ID。
* **Topic**: Topic 是一个逻辑上的消息队列,它可以被多个 Broker 共享。每个 Topic 有自己的分区(Partition)。
* **Partition**: Partition 是一个物理上的消息存储单元,每个 Partition 可以分布在多个 Broker 上。
####3. 特点Kafka 的一些特点包括:
* **高吞吐量**: Kafka 支持非常高的吞吐量,能够处理数十万条消息每秒。
* **低延迟**: Kafka 能够提供非常低的延迟,因为它使用了零拷贝技术和内存映射文件。
* **可扩展性**: Kafka 的架构设计非常灵活,可以轻松地添加或删除 Broker 来适应系统的需求。
####4. 使用示例下面是一个简单的 Kafka 示例,使用 Python 的 `kafka-python` 库来生产和消费消息:
from kafka import KafkaProducer, KafkaConsumer# 生产者配置producer = KafkaProducer(bootstrap_servers='localhost:9092') # 消费者配置consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092') # 生产消息producer.send('my_topic', value=b'Hello, world!') # 消费消息for message in consumer: print(message.value.decode())
### 二、RocketMQ####1. 简介Apache RocketMQ 是一个分布式消息队列系统,最初是由 Alibaba 公司内部使用,后来开源给 Apache 基金会。RocketMQ 的主要功能是作为一个高吞吐量的消息队列系统,能够处理大量的日志和事件数据。
####2. 架构RocketMQ 的架构非常简单,主要包括以下几个组件:
* **NameServer**: RocketMQ 中的 NameServer 是负责维护 Broker 和 Topic信息的节点。
* **Broker**: RocketMQ 中的 Broker 是负责存储和传输消息的节点。每个 Broker 都有一个唯一的 ID。
* **Topic**: Topic 是一个逻辑上的消息队列,它可以被多个 Broker 共享。
####3. 特点RocketMQ 的一些特点包括:
* **高吞吐量**: RocketMQ 支持非常高的吞吐量,能够处理数十万条消息每秒。
* **低延迟**: RocketMQ 能够提供非常低的延迟,因为它使用了零拷贝技术和内存映射文件。
* **可扩展性**: RocketMQ 的架构设计非常灵活,可以轻松地添加或删除 Broker 来适应系统的需求。
####4. 使用示例下面是一个简单的 RocketMQ 示例,使用 Java 的 `rocketmq-client` 库来生产和消费消息:
javaimport org.apache.rocketmq.client.consumer.DefaultConsumer; import org.apache.rocketmq.client.consumer.MessageListenerConcurrently; import org.apache.rocketmq.client.producer.DefaultMQProducer; public class RocketMQExample { public static void main(String[] args) throws Exception { // 生产者配置 DefaultMQProducer producer = new DefaultMQProducer("my_group"); producer.setNamesrvAddr("localhost:9876"); // 消费者配置 DefaultConsumer consumer = new DefaultConsumer("my_group", "my_topic"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public void onMessage(Listmsgs, ConsumeOrderlyContext context) { for (MessageExt message : msgs) { System.out.println(message.getBody()); } } }); // 生产消息 producer.start(); producer.send(new Message("my_topic", "Hello, world!".getBytes())); // 消费消息 consumer.start(); } }
### 三、Pulsar####1. 简介Apache Pulsar 是一个分布式流数据平台,最初是由 Yahoo! 公司内部使用,后来开源给 Apache 基金会。Pulsar 的主要功能是作为一个高吞吐量的消息队列系统,能够处理大量的日志和事件数据。
####2. 架构Pulsar 的架构非常简单,主要包括以下几个组件:
* **Broker**: Pulsar 中的 Broker 是负责存储和传输消息的节点。每个 Broker 都有一个唯一的 ID。
* **Topic**: Topic 是一个逻辑上的消息队列,它可以被多个 Broker 共享。
* **Partition**: Partition 是一个物理上的消息存储单元,每个 Partition 可以分布在多个 Broker 上。
####3. 特点Pulsar 的一些特点包括:
* **高吞吐量**: Pulsar 支持非常高的吞吐量,能够处理数十万条消息每秒。
* **低延迟**: Pulsar 能够提供非常低的延迟,因为它使用了零拷贝技术和内存映射文件。
* **可扩展性**: Pulsar 的架构设计非常灵活,可以轻松地添加或删除 Broker 来适应系统的需求。
####4. 使用示例下面是一个简单的 Pulsar 示例,使用 Java 的 `pulsar-client` 库来生产和消费消息:
javaimport org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionName; public class PulsarExample { public static void main(String[] args) throws Exception { // 生产者配置 PulsarClient client = PulsarClient.builder() .serviceHttpUrl(" /> .build(); Producerproducer = client.newProducer() .topic("my_topic") .createIfNotExists(true) .blockIfBusy(false) .build(); // 消费者配置 Consumer consumer = client.newConsumer() .topic("my_topic") .subscriptionName(SubscriptionName.random()) .subscriptionType(SubscriptionType.Exclusive) .enableBatching(true) .batchingMaxMessages(1000) .batchingMaxBytes(1024 *1024) .build(); // 生产消息 producer.send("Hello, world!".getBytes()); // 消费消息 consumer.subscribe(); } }
以上就是关于 Kafka、RocketMQ 和 Pulsar 的简单介绍和使用示例。这些消息队列系统都支持高吞吐量、低延迟和可扩展性,能够处理大量的日志和事件数据。