SpringBoot整合SpringCloudStream3.1+版本Kafka
发布人:shili8
发布时间:2025-02-09 04:27
阅读次数:0
**Spring Boot 整合 Spring Cloud Stream3.1 + Kafka**
在本文中,我们将介绍如何使用 Spring Boot 和 Spring Cloud Stream3.1 版本整合 Kafka。我们将创建一个生产者端和消费者端的应用程序,分别向 Kafka topic 发送消息并从 Kafka topic 中读取消息。
**依赖配置**
首先,我们需要在 `pom.xml` 文件中添加以下依赖:
xml<dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Cloud Stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <!-- Kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <!-- Spring Cloud Stream Binder Kafka Configuration --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-config</artifactId> </dependency> </dependencies>
**生产者端配置**
接下来,我们需要在 `application.properties` 文件中配置生产者端的 Kafka topic:
propertiesspring: cloud: stream: binder: kafka: bootstrap-servers: localhost:9092 key-serializer: org.springframework.cloud.stream.binder.kafka.serializer.KeySerdeDelegatingSerializer value-serializer: org.springframework.cloud.stream.binder.kafka.serializer.ValueSerdeDelegatingSerializer kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
**生产者端代码**
现在,我们可以创建一个生产者端的应用程序,向 Kafka topic 发送消息:
java@SpringBootApplication@EnableBinding(Sink.class)
public class ProducerApplication {
@Autowired private Source source;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean public Source source() {
return new Source();
}
}
java@Componentpublic class Source implements Source {
@Override public void send(Payload payload) {
String message = (String) payload.getPayload();
System.out.println("Sending message: " + message);
// Send the message to Kafka topic kafkaTemplate.send("my-topic", message);
}
}
**消费者端配置**
接下来,我们需要在 `application.properties` 文件中配置消费者端的 Kafka topic:
propertiesspring: cloud: stream: binder: kafka: bootstrap-servers: localhost:9092 key-serializer: org.springframework.cloud.stream.binder.kafka.serializer.KeySerdeDelegatingSerializer value-serializer: org.springframework.cloud.stream.binder.kafka.serializer.ValueSerdeDelegatingSerializer kafka: consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
**消费者端代码**
现在,我们可以创建一个消费者端的应用程序,从 Kafka topic 中读取消息:
java@SpringBootApplication@EnableBinding(Sink.class)
public class ConsumerApplication {
@Autowired private Sink sink;
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean public Sink sink() {
return new Sink();
}
}
java@Componentpublic class Sink implements Sink {
@Override public void receive(Payload payload) {
String message = (String) payload.getPayload();
System.out.println("Received message: " + message);
// Process the message processMessage(message);
}
private void processMessage(String message) {
System.out.println("Processing message: " + message);
// Add your business logic here }
}
**总结**
在本文中,我们介绍了如何使用 Spring Boot 和 Spring Cloud Stream3.1 版本整合 Kafka。我们创建了一个生产者端和消费者端的应用程序,分别向 Kafka topic 发送消息并从 Kafka topic 中读取消息。

