SpringBoot整合SpringCloudStream3.1+版本Kafka
发布人:shili8
发布时间:2025-02-09 04:27
阅读次数:0
**Spring Boot 整合 Spring Cloud Stream3.1 + Kafka**
在本文中,我们将介绍如何使用 Spring Boot 和 Spring Cloud Stream3.1 版本整合 Kafka。我们将创建一个生产者端和消费者端的应用程序,分别向 Kafka topic 发送消息并从 Kafka topic 中读取消息。
**依赖配置**
首先,我们需要在 `pom.xml` 文件中添加以下依赖:
xml<dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Cloud Stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <!-- Kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <!-- Spring Cloud Stream Binder Kafka Configuration --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-config</artifactId> </dependency> </dependencies>
**生产者端配置**
接下来,我们需要在 `application.properties` 文件中配置生产者端的 Kafka topic:
propertiesspring: cloud: stream: binder: kafka: bootstrap-servers: localhost:9092 key-serializer: org.springframework.cloud.stream.binder.kafka.serializer.KeySerdeDelegatingSerializer value-serializer: org.springframework.cloud.stream.binder.kafka.serializer.ValueSerdeDelegatingSerializer kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
**生产者端代码**
现在,我们可以创建一个生产者端的应用程序,向 Kafka topic 发送消息:
java@SpringBootApplication@EnableBinding(Sink.class) public class ProducerApplication { @Autowired private Source source; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Bean public Source source() { return new Source(); } }
java@Componentpublic class Source implements Source { @Override public void send(Payload payload) { String message = (String) payload.getPayload(); System.out.println("Sending message: " + message); // Send the message to Kafka topic kafkaTemplate.send("my-topic", message); } }
**消费者端配置**
接下来,我们需要在 `application.properties` 文件中配置消费者端的 Kafka topic:
propertiesspring: cloud: stream: binder: kafka: bootstrap-servers: localhost:9092 key-serializer: org.springframework.cloud.stream.binder.kafka.serializer.KeySerdeDelegatingSerializer value-serializer: org.springframework.cloud.stream.binder.kafka.serializer.ValueSerdeDelegatingSerializer kafka: consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
**消费者端代码**
现在,我们可以创建一个消费者端的应用程序,从 Kafka topic 中读取消息:
java@SpringBootApplication@EnableBinding(Sink.class) public class ConsumerApplication { @Autowired private Sink sink; public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @Bean public Sink sink() { return new Sink(); } }
java@Componentpublic class Sink implements Sink { @Override public void receive(Payload payload) { String message = (String) payload.getPayload(); System.out.println("Received message: " + message); // Process the message processMessage(message); } private void processMessage(String message) { System.out.println("Processing message: " + message); // Add your business logic here } }
**总结**
在本文中,我们介绍了如何使用 Spring Boot 和 Spring Cloud Stream3.1 版本整合 Kafka。我们创建了一个生产者端和消费者端的应用程序,分别向 Kafka topic 发送消息并从 Kafka topic 中读取消息。