当前位置:实例文章 » JAVA Web实例» [文章]SpringBoot整合SpringCloudStream3.1+版本Kafka

SpringBoot整合SpringCloudStream3.1+版本Kafka

发布人:shili8 发布时间:2025-02-09 04:27 阅读次数:0

**Spring Boot 整合 Spring Cloud Stream3.1 + Kafka**

在本文中,我们将介绍如何使用 Spring Boot 和 Spring Cloud Stream3.1 版本整合 Kafka。我们将创建一个生产者端和消费者端的应用程序,分别向 Kafka topic 发送消息并从 Kafka topic 中读取消息。

**依赖配置**

首先,我们需要在 `pom.xml` 文件中添加以下依赖:

xml<dependencies>
 <!-- Spring Boot -->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

 <!-- Spring Cloud Stream -->
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka</artifactId>
 </dependency>

 <!-- Kafka -->
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 </dependency>

 <!-- Spring Cloud Stream Binder Kafka Configuration -->
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka-config</artifactId>
 </dependency>
</dependencies>


**生产者端配置**

接下来,我们需要在 `application.properties` 文件中配置生产者端的 Kafka topic:

propertiesspring:
 cloud:
 stream:
 binder:
 kafka:
 bootstrap-servers: localhost:9092 key-serializer: org.springframework.cloud.stream.binder.kafka.serializer.KeySerdeDelegatingSerializer value-serializer: org.springframework.cloud.stream.binder.kafka.serializer.ValueSerdeDelegatingSerializer kafka:
 producer:
 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer


**生产者端代码**

现在,我们可以创建一个生产者端的应用程序,向 Kafka topic 发送消息:

java@SpringBootApplication@EnableBinding(Sink.class)
public class ProducerApplication {

 @Autowired private Source source;

 public static void main(String[] args) {
 SpringApplication.run(ProducerApplication.class, args);
 }

 @Bean public Source source() {
 return new Source();
 }
}


java@Componentpublic class Source implements Source {

 @Override public void send(Payload payload) {
 String message = (String) payload.getPayload();
 System.out.println("Sending message: " + message);
 // Send the message to Kafka topic kafkaTemplate.send("my-topic", message);
 }
}


**消费者端配置**

接下来,我们需要在 `application.properties` 文件中配置消费者端的 Kafka topic:

propertiesspring:
 cloud:
 stream:
 binder:
 kafka:
 bootstrap-servers: localhost:9092 key-serializer: org.springframework.cloud.stream.binder.kafka.serializer.KeySerdeDelegatingSerializer value-serializer: org.springframework.cloud.stream.binder.kafka.serializer.ValueSerdeDelegatingSerializer kafka:
 consumer:
 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


**消费者端代码**

现在,我们可以创建一个消费者端的应用程序,从 Kafka topic 中读取消息:

java@SpringBootApplication@EnableBinding(Sink.class)
public class ConsumerApplication {

 @Autowired private Sink sink;

 public static void main(String[] args) {
 SpringApplication.run(ConsumerApplication.class, args);
 }

 @Bean public Sink sink() {
 return new Sink();
 }
}


java@Componentpublic class Sink implements Sink {

 @Override public void receive(Payload payload) {
 String message = (String) payload.getPayload();
 System.out.println("Received message: " + message);
 // Process the message processMessage(message);
 }

 private void processMessage(String message) {
 System.out.println("Processing message: " + message);
 // Add your business logic here }
}


**总结**

在本文中,我们介绍了如何使用 Spring Boot 和 Spring Cloud Stream3.1 版本整合 Kafka。我们创建了一个生产者端和消费者端的应用程序,分别向 Kafka topic 发送消息并从 Kafka topic 中读取消息。

其他信息

其他资源

Top