当前位置: 首页> 健康> 科研 > springboot 实现kafka多源配置

springboot 实现kafka多源配置

时间:2025/7/9 11:29:34来源:https://blog.csdn.net/qq_37362891/article/details/139391610 浏览次数:0次

文章目录

  • 背景
  • 核心配置
    • 自动化配置类
    • 注册生产者、消费者核心bean到spring
    • 配置spring.factories
    • yml配置
    • 使用
  • 源码仓库

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka
@Configuration(proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {public MyKafkaAutoConfiguration() {}public void setBeanFactory(BeanFactory beanFactory) throws BeansException {beanFactory.getBean(CustomKafkaDataSourceRegister.class);}
}

注册生产者、消费者核心bean到spring

public void afterPropertiesSet() {Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();if (factories != null && !factories.isEmpty()) {factories.forEach((factoryName, consumerConfig) -> {KafkaProperties.Listener listener = consumerConfig.getListener();Integer concurrency = consumerConfig.getConcurrency();// 创建监听容器工厂ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);// 注册到容器if (!beanFactory.containsBean(factoryName)) {beanFactory.registerSingleton(factoryName, containerFactory);}});}Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();if (!ObjectUtils.isEmpty(templates)) {templates.forEach((templateName, producerConfig) -> {//registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);//注册spring bean的两种方式registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));});}}

配置spring.factories


org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.kafka.MyKafkaAutoConfiguration

yml配置

spring:kafka:multiple:consumer:factories:test-factory:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: 192.168.56.112:9092group-id: group_aconcurrency: 25fetch-min-size: 1048576fetch-max-wait: 3000listener:type: batchproperties:spring-json-trusted-packages: '*'key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latestproducer:templates:test-template:bootstrap-servers: 192.168.56.112:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在这里插入图片描述

在这里插入图片描述

源码仓库

https://github.com/fafeidou/shield

关键字:springboot 实现kafka多源配置

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: