当前位置:实例文章 » C#开发实例» [文章][springboot配置Kafka] springboot配置多个kafka,包含账号密码

[springboot配置Kafka] springboot配置多个kafka,包含账号密码

发布人:shili8 发布时间:2024-06-07 11:49 阅读次数:0

在实际的软件开发过程中,经常会用到消息队列工具来进行数据传输和处理。Kafka作为一种分布式消息队列系统,被广泛应用于各种场景中。而在使用SpringBoot进行开发时,配置Kafka也是一个比较常见的需求。在本文中,将介绍如何在SpringBoot项目中配置多个Kafka实例,并且包含账号密码认证的情况。

1. 集成Kafka依赖首先,在SpringBoot项目的pom.xml文件中添加Kafka的依赖:

xml<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
</dependency>


然后通过maven进行项目的重新构建,以便引入这个依赖。

2. 配置Kafka实例在SpringBoot项目的配置文件application.properties中,添加Kafka的配置信息:

properties# kafka1kafka1.bootstrap.servers=server1:9092,server2:9092kafka1.consumer.group-id=group1kafka1.consumer.auto-offset-reset=earliestkafka1.consumer.enable-auto-commit=truekafka1.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka1.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka1.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerkafka1.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerkafka1.security.protocol=SASL_PLAINTEXTkafka1.sasl.mechanism=PLAINkafka1.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user1" password="pass1";

# kafka2kafka2.bootstrap.servers=server3:9092,server4:9092kafka2.consumer.group-id=group2kafka2.consumer.auto-offset-reset=earliestkafka2.consumer.enable-auto-commit=truekafka2.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka2.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka2.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerkafka2.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerkafka2.security.protocol=SASL_PLAINTEXTkafka2.sasl.mechanism=PLAINkafka2.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user2" password="pass2";


上述配置中,我们配置了两个Kafka实例,分别是kafka1和kafka2。每个实例都有自己的bootstrap.servers、consumer和producer的属性配置。其中,security.protocol和sasl.mechanism是用来进行账号密码认证的配置。sasl.jaas.config中需要指定对应的用户名和密码。

3. 创建Kafka配置类在项目中创建Kafka配置类,用来获取配置文件中的Kafka属性,并创建KafkaTemplate和ConsumerFactory。

java@Configurationpublic class KafkaConfig {

 @Value("${kafka1.bootstrap.servers}")
 private String bootstrapServers1;

 @Value("${kafka2.bootstrap.servers}")
 private String bootstrapServers2;

 // kafka1 @Bean public KafkaTemplate kafkaTemplate1() {
 Map props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers1);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_ CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put("security.protocol", "SASL_PLAINTEXT");
 props.put("sasl.mechanism", "PLAIN");
 props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
 return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
 }

 @Bean public ConsumerFactory consumerFactory1() {
 Map props = new HashMap<>();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers1);
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 props.put("security.protocol", "SASL_PLAINTEXT");
 props.put("sasl.mechanism", "PLAIN");
 props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
 return new DefaultKafkaConsumerFactory<>(props);
 }

 // kafka2 @Bean public KafkaTemplate kafkaTemplate2() {
 Map props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers2);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_ CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put("security.protocol", "SASL_PLAINTEXT");
 props.put("sasl.mechanism", "PLAIN");
 props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user2 password=pass2;");
 return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
 }

 @Bean public ConsumerFactory consumerFactory2() {
 Map props = new HashMap<>();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers2);
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 props.put("security.protocol", "SASL_PLAINTEXT");
 props.put("sasl.mechanism", "PLAIN");
 props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user2 password=pass2;");
 return new DefaultKafkaConsumerFactory<>(props);
 }
}


在这个配置类中,我们分别创建了两个KafkaTemplate和ConsumerFactory的Bean,用来连接kafka1和kafka2的实例。注意到在配置这些Bean的时候也需要设置security.protocol、sasl.mechanism和sasl.jaas.config,用来进行账号密码的认证。

4. 测试Kafka配置最后,我们可以在代码中使用创建好的KafkaTemplate和ConsumerFactory,来发送和接收消息。

java@Servicepublic class KafkaService {

 @Autowired private KafkaTemplate kafkaTemplate1;

 @Autowired private KafkaTemplate kafkaTemplate2;

 @Autowired private ConsumerFactory consumerFactory1;

 @Autowired private ConsumerFactory consumerFactory2;

 public void sendMessage(String topic, String message, boolean isKafka1) {
 if (isKafka1) {
 kafkaTemplate1.send(topic, message);
 } else {
 kafkaTemplate2.send(topic, message);
 }
 }

 public void consumeMessage(String topic, boolean isKafka1) {
 Consumer consumer;
 if (isKafka1) {
 consumer = consumerFactory1.createConsumer();
 } else {
 consumer = consumerFactory2.createConsumer();
 }
 consumer.subscribe(Collections.singleton(topic));
 ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord record : records) {
 System.out.println(record.value());
 }
 consumer.close();
 }
}


在上面的KafkaService中,我们使用KafkaTemplate来发送消息,使用ConsumerFactory来创建一个Consumer实例来接收消息。isKafka1参数用来指明要使用哪个Kafka实例。

通过以上的配置和示例代码,我们可以在SpringBoot项目中轻松地集成并配置多个Kafka实例,并且包含账号密码认证。这些配置不仅能够满足常规的Kafka使用需求,同时也能够在实际的复杂项目中发挥作用。

相关标签:c#linqkafka分布式
其他信息

其他资源

Top