[springboot配置Kafka] springboot配置多个kafka,包含账号密码
发布人:shili8
发布时间:2024-06-07 11:49
阅读次数:0
在实际的软件开发过程中,经常会用到消息队列工具来进行数据传输和处理。Kafka作为一种分布式消息队列系统,被广泛应用于各种场景中。而在使用SpringBoot进行开发时,配置Kafka也是一个比较常见的需求。在本文中,将介绍如何在SpringBoot项目中配置多个Kafka实例,并且包含账号密码认证的情况。
1. 集成Kafka依赖首先,在SpringBoot项目的pom.xml文件中添加Kafka的依赖:
xml<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
然后通过maven进行项目的重新构建,以便引入这个依赖。
2. 配置Kafka实例在SpringBoot项目的配置文件application.properties中,添加Kafka的配置信息:
properties# kafka1kafka1.bootstrap.servers=server1:9092,server2:9092kafka1.consumer.group-id=group1kafka1.consumer.auto-offset-reset=earliestkafka1.consumer.enable-auto-commit=truekafka1.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka1.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka1.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerkafka1.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerkafka1.security.protocol=SASL_PLAINTEXTkafka1.sasl.mechanism=PLAINkafka1.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user1" password="pass1"; # kafka2kafka2.bootstrap.servers=server3:9092,server4:9092kafka2.consumer.group-id=group2kafka2.consumer.auto-offset-reset=earliestkafka2.consumer.enable-auto-commit=truekafka2.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka2.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerkafka2.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerkafka2.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerkafka2.security.protocol=SASL_PLAINTEXTkafka2.sasl.mechanism=PLAINkafka2.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user2" password="pass2";
上述配置中,我们配置了两个Kafka实例,分别是kafka1和kafka2。每个实例都有自己的bootstrap.servers、consumer和producer的属性配置。其中,security.protocol和sasl.mechanism是用来进行账号密码认证的配置。sasl.jaas.config中需要指定对应的用户名和密码。
3. 创建Kafka配置类在项目中创建Kafka配置类,用来获取配置文件中的Kafka属性,并创建KafkaTemplate和ConsumerFactory。
java@Configurationpublic class KafkaConfig { @Value("${kafka1.bootstrap.servers}") private String bootstrapServers1; @Value("${kafka2.bootstrap.servers}") private String bootstrapServers2; // kafka1 @Bean public KafkaTemplatekafkaTemplate1() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_ CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;"); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } @Bean public ConsumerFactory consumerFactory1() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers1); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;"); return new DefaultKafkaConsumerFactory<>(props); } // kafka2 @Bean public KafkaTemplate kafkaTemplate2() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers2); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_ CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user2 password=pass2;"); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } @Bean public ConsumerFactory consumerFactory2() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers2); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=user2 password=pass2;"); return new DefaultKafkaConsumerFactory<>(props); } }
在这个配置类中,我们分别创建了两个KafkaTemplate和ConsumerFactory的Bean,用来连接kafka1和kafka2的实例。注意到在配置这些Bean的时候也需要设置security.protocol、sasl.mechanism和sasl.jaas.config,用来进行账号密码的认证。
4. 测试Kafka配置最后,我们可以在代码中使用创建好的KafkaTemplate和ConsumerFactory,来发送和接收消息。
java@Servicepublic class KafkaService { @Autowired private KafkaTemplatekafkaTemplate1; @Autowired private KafkaTemplate kafkaTemplate2; @Autowired private ConsumerFactory consumerFactory1; @Autowired private ConsumerFactory consumerFactory2; public void sendMessage(String topic, String message, boolean isKafka1) { if (isKafka1) { kafkaTemplate1.send(topic, message); } else { kafkaTemplate2.send(topic, message); } } public void consumeMessage(String topic, boolean isKafka1) { Consumer consumer; if (isKafka1) { consumer = consumerFactory1.createConsumer(); } else { consumer = consumerFactory2.createConsumer(); } consumer.subscribe(Collections.singleton(topic)); ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.println(record.value()); } consumer.close(); } }
在上面的KafkaService中,我们使用KafkaTemplate来发送消息,使用ConsumerFactory来创建一个Consumer实例来接收消息。isKafka1参数用来指明要使用哪个Kafka实例。
通过以上的配置和示例代码,我们可以在SpringBoot项目中轻松地集成并配置多个Kafka实例,并且包含账号密码认证。这些配置不仅能够满足常规的Kafka使用需求,同时也能够在实际的复杂项目中发挥作用。