@KafkaListener和KafkaTemplate自动装配原理分析

news/2025/2/26 7:37:33

依赖项和配置信息参见另一篇博文@KafkaListener的配置使用,这里主要借助源码分析@KafkaListener和KafkaTemplate自动装配原理。

1、KafkaAutoConfiguration 源码分析

KafkaAutoConfiguration类自动装配生成了生产者客户端KafkaTemplate的bean和消费者基础ConsumerFactory的bean,KafkaAutoConfiguration导入KafkaAnnotationDrivenConfiguration,KafkaAnnotationDrivenConfiguration最终生成了ConcurrentKafkaListenerContainerFactory的bean, 该bean是@KafkaListener默认使用的容器工厂,即指定了消费的kafka集群。

package org.springframework.boot.autoconfigure.kafka;

import java.io.IOException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;

@Configuration(
    proxyBeanMethods = false
)
/// KafkaTemplate在类路径下存在时,加载该配置类
@ConditionalOnClass({KafkaTemplate.class})  
@EnableConfigurationProperties({KafkaProperties.class})
/// 将KafkaAnnotationDrivenConfiguration、KafkaStreamsAnnotationDrivenConfiguration配置类合并过来
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})  
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean  
    /// KafkaTemplate的bean未定义时,自动生成该bean
    @ConditionalOnMissingBean({KafkaTemplate.class})
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean({ProducerFactory.class})
    public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }

        customizers.orderedStream().forEach((customizer) -> {
            customizer.customize(factory);
        });
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean({ProducerListener.class})
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener();
    }

    @Bean
    /// ConsumerFactory的bean未定义时,自动生成该bean
    @ConditionalOnMissingBean({ConsumerFactory.class})
    public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> {
            customizer.customize(factory);
        });
        return factory;
    }

    @Bean
    @ConditionalOnProperty(
        name = {"spring.kafka.producer.transaction-id-prefix"}
    )
    @ConditionalOnMissingBean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(
        name = {"spring.kafka.jaas.enabled"}
    )
    @ConditionalOnMissingBean
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        KafkaProperties.Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }

        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }

        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean
    @ConditionalOnMissingBean
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }
}

2、KafkaAnnotationDrivenConfiguration 源码分析

package org.springframework.boot.autoconfigure.kafka;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;

/**
 * Configuration for Kafka annotation-driven support.
 *
 * @author Gary Russell
 * @author Eddú Meléndez
 */
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    private final RecordMessageConverter messageConverter;

    private final BatchMessageConverter batchMessageConverter;

    private final KafkaTemplate<Object, Object> kafkaTemplate;

    private final KafkaAwareTransactionManager<Object, Object> transactionManager;

    private final ConsumerAwareRebalanceListener rebalanceListener;

    private final ErrorHandler errorHandler;

    private final BatchErrorHandler batchErrorHandler;

    private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;

    private final RecordInterceptor<Object, Object> recordInterceptor;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
          ObjectProvider<RecordMessageConverter> messageConverter,
          ObjectProvider<BatchMessageConverter> batchMessageConverter,
          ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
          ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
          ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
          ObjectProvider<BatchErrorHandler> batchErrorHandler,
          ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
          ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
       this.properties = properties;
       this.messageConverter = messageConverter.getIfUnique();
       this.batchMessageConverter = batchMessageConverter
             .getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
       this.kafkaTemplate = kafkaTemplate.getIfUnique();
       this.transactionManager = kafkaTransactionManager.getIfUnique();
       this.rebalanceListener = rebalanceListener.getIfUnique();
       this.errorHandler = errorHandler.getIfUnique();
       this.batchErrorHandler = batchErrorHandler.getIfUnique();
       this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
       this.recordInterceptor = recordInterceptor.getIfUnique();
    }

    @Bean
    @ConditionalOnMissingBean
    ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
       ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
       configurer.setKafkaProperties(this.properties);
       MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
             ? this.batchMessageConverter : this.messageConverter;
       configurer.setMessageConverter(messageConverterToUse);
       configurer.setReplyTemplate(this.kafkaTemplate);
       configurer.setTransactionManager(this.transactionManager);
       configurer.setRebalanceListener(this.rebalanceListener);
       configurer.setErrorHandler(this.errorHandler);
       configurer.setBatchErrorHandler(this.batchErrorHandler);
       configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
       configurer.setRecordInterceptor(this.recordInterceptor);
       return configurer;
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")  
    /// ConcurrentKafkaListenerContainerFactory的bean未定义时,自动生成该bean(这是@KafkaListener默认使用的容器工厂)
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
          ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
          ConsumerFactory<Object, Object> kafkaConsumerFactory) {
       ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
       configurer.configure(factory, kafkaConsumerFactory);
       return factory;
    }

    @Configuration(proxyBeanMethods = false)
    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    static class EnableKafkaConfiguration {

    }

}

http://www.niftyadmin.cn/n/5868315.html

相关文章

03、Hadoop3.x从入门到放弃,第三章:Windows测试环境搭建

Hadoop3.x从入门到放弃&#xff0c;第三章&#xff1a;Windows测试环境搭建 一、Windows测试环境搭建 预先安装好JDK环境&#xff0c;这里不在赘述。 1、下载Hadoop相关文件 Hadoop各版本安装包&#xff1a;https://archive.apache.org/dist/hadoop/common/ 【我选择的是ha…

上证50期权代码是什么?上证50股指期权数据从哪里可以找到?

说起期权代码&#xff0c;其实期权代码是期权合约的唯一标识&#xff0c;通过代码可以准确地识别不同的期权合约&#xff0c;所以在期权交易中&#xff0c;了解期权代码是至关重要的一环。 上证50期权代码结构 上证50ETF期权代码由17位字符组成&#xff0c;例如“10002500C240…

设计模式 简单汇总

设计模式是软件工程中广泛使用的一套解决方案&#xff0c;用于解决常见问题并提高代码的质量。它们分为创建型、结构型和行为型三类&#xff0c;共23种模式。以下是各类别及其常见模式的详细说明&#xff1a; 目录 创建型模式结构型模式行为型模式 创建型模式 这些模式关注对象…

DeepSeek点燃AI大模型战火:编程语言争霸,谁将问鼎“终极武器”王座?

DeepSeek点燃AI大模型战火&#xff1a;编程语言争霸&#xff0c;谁将问鼎“终极武器”王座&#xff1f; 一、DeepSeek&#xff1a;AI大模型竞赛的“导火索” 2023年&#xff0c;中国AI公司深度求索&#xff08;DeepSeek&#xff09;发布DeepSeek-R1大模型&#xff0c;凭借其超…

第七章:消息管理模块

目录 第一节&#xff1a;代码实现 1-1.消息持久化管理思想 1-2.MessageMapper类 1-3.QueueMessage类 1-4.MessageManager 第二节&#xff1a;单元测试 下期预告&#xff1a; 消息管理模块在mqserver下实现。 第一节&#xff1a;代码实现 消息管理首先需要消息类&#xff0c…

交换机与路由器连接方式

交换机和路由器连接的三种主要方式如下&#xff1a; 一、直连连接 这是最简单直接的连接方式。通过一根网线将交换机的一个端口与路由器的一个LAN端口相连。这种连接方式适用于小型网络&#xff0c;其中交换机负责局域网内部的数据交换&#xff0c;而路由器则负责将内部网络连接…

回归分析中的回归含义的理解

“回归”这个词在回归分析中有着特定的历史背景和统计意义&#xff0c;它的含义与现代汉语中的“回归”有所不同。以下是详细的解释&#xff1a; 1. 回归的起源 历史背景&#xff1a;回归分析最早由英国统计学家弗朗西斯高尔顿&#xff08;Francis Galton&#xff09;在19世纪…

SpringBoot使用Jasypt对YML文件配置内容进行加密(例:数据库密码加密)

SpringBoot使用Jasypt对YML文件配置内容进行加密&#xff08;例&#xff1a;数据库密码加密&#xff09; 前言 在SpringBoot的项目开发中&#xff0c;大多数情况下 yml 配置文件中存储的密码均以明文形式展示&#xff0c;这种方式显然存在较大的安全隐患。一旦有开发人员离职&…