ActiveMQ 自动配置

ActiveMQ 是 Apache 提供的一个开源的消息系统,很好地支持了 JMS 规范。在使用ActiveMQ 时需要在 pom 文件中引入 spring-boot-starter-activemq。

ActiveMQ 在 Spring Boot 的自动配置类注册同样在 META-INF/spring.factories 中。

# Auto Configureorg. springframework . boot . autoconfigure . EnableAutoConfiguration=org. springframework . boot . autoconfigure . jms . activemq. ActiveMQAutoConfiguration, 

ActiveMQAutoConfiguration 类没有具体代码实现,主要通过注解完成功能的实现。

@Configuration(proxyBeanMethods = false)@AutoConfigureBefore (JmsAutoConfiguration.class)@AutoConfigureAfter({ JndiConnectionF actoryAutoConfiguration.class })@Conditional0nClass({ ConnectionFactory. class, ActiveMQConnectionFactory.cl@ConditionalOnMissingBean(ConnectionFactory.class)@EnableConfigurationProperties({ ActiveMQProperties. class, JmsProperties.class })@Import({ ActiveMQXAConnectionF actoryConfiguration. class,ActiveMQConnectionFactoryConfiguration.class })public class ActiveMQAutoConfiguration {}

@AutoConfigureBefore 注解指定该类需在 JmsAutoConfiguration 配置之前进行初始化。前面我们已经讲过,JmsAutoConfiguration 初始化时需 要用到 ActiveMQAutoConfiguration初始化的 ConnectionFactory,因此需要在 JmsAutoConfiguration 之前完成初始化。

@AutoConfigureAfter 中指定了在 JndiConnectionFactoryAutoConfiguration 配置完成之后进行初始化。

JndiConnectionFactoryAutoConfiguration 中主要通过 JNDI 的方式获得 ConnectionFactory实例下面我先看 JndiConnectionFactoryAutoConfiguration 的初始化 Condition 条件代码。

@Configuration(proxyBeanMethods = false)@AutoConfigureBefore (JmsAutoConfiguration. class)@ConditionalOnClass (Jms Template.class)@ConditionalOnMi ssingBean(ConnectionFactory. class)@Conditional (JndiOrPropertyCondition. class)@EnableConfigurationProperties ( JmsProperties. class)public class JndiConnectionF actoryAutoConfiguration {2private static final String[] JNDI_ LOCATIONS = { "java:/JmsXA", "java:/XAConne -ctionFactory" };// JNDI 名称或特定属性的条static class JndiOrPropertyCondition extends AnyNestedCondition {JndiOrPropertyCondition() {super(ConfigurationPhase. . PARSE_ CONFIGURATION);@ConditionalOnJndi({ "java:/JmsXA", "java:/XAConnectionFactory" })static class Jndi {@Conditiona lOnProperty(prefix = "spring.jms", name = "jndi -name")static class Property {}}}

我们不再赘述 JndiConnectionFactoryAutoConfiguration 初始化条件中的常规项,重点看@Conditional 指 定 的 JndiOrPropertyCondition 条 件 。

该 类 的 实 现 就 在JndiConnectionFactory-AutoConfiguration 内部,通过继承 AnyNestedCondition 来实现判断条件:当指定的 JNDI 生效或 spring.jmsjndi-name 被配置时才会生效。SpringBoot 默认会检查 java:/JmsXA 和 java:/XAConnectionFactory 两个 JNDI 地址。

25b23990c834d85e5965d27c01eb345c.png

看完注解部分再来看内部的实现。

//省略炷解public class JIndiConnectionF actoryAutoConfiguration {//该地址数组,与下面的条件保持一致private static final String[] JNDI_ LOCATIONS = { "java:/JmsXA", "java:/XAConnec -tionFactory" };@Beanpublic ConnectionFactory connectionF actory(JmsProperties properties) thIndiLocatorDelegate jndiLocatorDelegate = JndiLocatorDelegate . create-DefaultResourceRefLocator();//如果配置文件中配置了 JNDI 名称,则通过捐定的 INDI 名称获段 ConnectionFactorif (StringUtils .hasLength(properties . getJndiName())) {return jndiLocatorDelegate . lookup(properties . getJndiName(), Conne -ctionFactory.class);/如果配置文件中未配置 JNDI 名称, 则使用默认的名称进行查我return findJndiConnectionF actory(jndiLocatorDelegate);private Connect ionFactoryfindJndiConnectionFactory(IndiLocatorDelegate jindilocator-Delegate) {//遍历默认值并进行查找for (String name : JNDI_LOCATIONS) {try {return jndiLocatorDelegate . lookup(name, ConnectionFactory.class);} catch (NamingException ex) {//吞没异常,并继续} throw new IllegalStateException("Unable to find ConnectionFactory in JNDI location”+ Arrays.asList(JNDIATIONS));}}

获得 ConnectionFactory 的过程比较简单,首先判断配置文件中是否配置了指定的 JNDI 名称,如果配置了,便按照配置进行查找;如果未配置,则遍历默认 JNDI 名称数组,进行查找。

查找功能由 JndiL ocatorDelegate 提供,该类继承自 JndiLocatorSupport,提供了公共的查找(lookup) 方法,可以方便地用作委托类。

我们继续看 ActiveMQAutoConfiguration 的注解,@ConditionalOnClass 指定了 classpath中 必 须 有 ConnectionFactory 类 和 ActiveMQConnectionFactory 类 存 在 。 同 时 ,@Conditional-OnMissingBean 指定容器中不能存在 ConnectionFactory 的 Bean。

@EnableConfigurationProperties 属性导入了 ActiveMQ 的 ActiveMQProperties 配置和JMS 的 JmsProperties 配置。

最后@lmport 引入了自动配置 ActiveMQXAConnectionFactoryConfiguration 和自动配置ActiveMQConnectionFactoryConfiguration.

ActiveMQXAConnectionFactoryConfiguration 主要用来初始化 ConnectionFactory,包含两部分内容:创建 XA 连接工厂和创建普通连接工厂。

@Configuration(ConnectionFactory. class)@Conditional0nClass (TransactionManager. class)@ConditionalOnBean(XAConnectionFactoryWrapper . class)@ConditionalOnMissingBean(ConnectionFactory. class)class ActiveMQXAConnectionF actoryConfiguration {//创建 XA 连接 I 厂@Primary@Bean(name = { "jmsConnectionFactory", "xaJmsConnectionFactory" })public ConnectionFactory jmsConnectionF actory(ActiveMQProperties properties,objectProvider factoryCustomizers ,XAConnectionF actoryWrapperwrapper) throws Exception {ActiveMQXAConnectionFactory connectionFactory = new ActiveMQConnection-FactoryFactory(properties, factoryCustomizers . orderedStream(). collect(Collectorsist()))“8. createConnectionFactory(ActiveMQXAConnectionFactory. class);return wrapper . wrapConnectionFactory (connectionFactory);//创建普通连接工厂@Bean@Conditiona lOnProperty(prefix = "spring. activemq. pool", name = "enabled" ,havingValue = "false", matchIfMissing = true)public ActiveMQConnectionF actory nonXaJmsConnectionF actory (ActiveMQProperties propertie5,objectProvider factoryCustomizers) {return new ActiveMQConnectionF actoryFactory(properties,factoryCustomizers . orderedstream(). collect(Collectors . tolist())). createConnectionFactory(ActiveMQConnectionFactory. class);}}

如 果 没 有 干 预 , 该 自 动 配 置 类 是 不 会 自 动 生 效 的 , 只 有 当 满 足XAConnectionFactory-Wrapper 的 Bean 存在时, 才会进行实例化操作,默认状态下是没有对应的 Bean 的。

jmsConnectionFactory 中 直 接 创 建 了 ActiveMQXAConnectionFactory 类 , 然 后 通 过XAConnectionFactoryWrapper 包装类的包装将其注册到 JTATransactionManager.nonXaJmsConnectionFactory 方法直接创建 ActiveMQConnectionFactoryFactory 对象,并将配置属性和定制化操作传入。ActiveMQConnectionFactoryFactory 本质上也是一个ConnectionFactory。

另外一个导入类 ActiveMQConnectionFactoryConfiguration 主要用来配置 ActiveMQ 的ConnectionFactory,提供了基于池化的 ConnectionFactory、基于缓存的 ConnectionFactory和普通的 ActiveMQ ConnectionFactory.ActiveMQConnectionFactoryConfiguration 中关于以上 3 种 ConnectionFactory 的初始化操作都比较简单,我们直接看核心代码。首先看普通的 ActiveMQ 连接工厂的初始化代码。

@Bean@ConditionalOnProperty(prefix = "spring . jms .cache", name = "enabled", havinValue = "false")ActiveMQConnectionF actory jmsConnectionF actory(ActiveMQProperties propertie5,ObjectProvider factoryCustomizers) {return createJmsConnectionFactory(properties, factoryCustomizers);private static ActiveMQConnectionFactory createJmsConnectionF actory(ActiveMProperties properties,ObjectProvider factoryCustomizers) {return new ActiveMQConnectionFactoryFactory(properties ,factoryCustomizers . orderedStream(). collect(Collectors . tolist())). createConnectionF actory(Act iveMQConnectionFactory.class);}

该连接工厂的初始化需要指定配置 spring.jms cache .enabled=false 时才会进行,初始化操作就是创建一个 ActiveMQConnectionFactoryFactory 对象,并将配置属性和定制化操作传入,然后调用其 createConnectionFactory 方法,完成 ActiveMQConnectionFactory 的创建。

这里简单看一下 ActiveMQConnectionFactoryFactory 创建过程中的核心代码。

private  T createConnectionFactoryInstce(Class factoryClass)throws InstantiationException, IllegalAccessException, InvocationTarget -Exception, NoSuchMethodException {String brokerUrl = determineBrokerUrl();String user = this . properties. getUser();String password this.properties.getPassword);f (StringUtils.hasLength(user) & StringUtils.hasLength(password)) {gclassot ”factoryClass. getConstructor(String.class, String.class, StrinnewInstance(user, password, brokerUrl);return factoryClass . getConstructor(String. class) . newInstance(brokerUr1);)

上面代码中获得 brokerUrl、 用户名、密码等内容,然后通过 ActiveMQConnection-Factory的 Class 对象 获取构造器并创建对象。

其中关于 brokerUrl 的获取,如果配置文件中指定了 brokerUr,则使用指定的,如果未指定并且 inMemory 配 置 项 为 true ( 默 认 为 true) , 则 brokerUr 默 认 为“vm://ocalhost?broker.persistent=false"其他情况则 brokerUrl 为 tp:/localhost:61616",相关代码如下。

class ActiveMOConnectionFactoryFactory {private static final String DEFAULT_ EMBEDDED BROKER URL = "vm://localhost?broker . persistent-false";private static final String DEFAULT _NETWORK_ BROKER_ _URL = "tcp://localhost:61616";str IngdetermineBrokerurto tnull) {f (this.properties.getBrokerUr1()return this.properties. getBrokerUrl();if (this.properties. isInMemory()) {return DEFAULT_ EMBEDDED_ BROKER_ URL ;return DEFAULT_ NETWORK_ BROKER_ URL ;}}

在 ActiveMQConnectionF actoryF actory 中完成 ActiveMQConnectionFactory 对象创建之后,返回之前还进行了- -系列的定制化操作,相关代码如下。

private  T doCreateConnectionFactory(Class-T> factoryClass) throws Exception {//获得 ActiveMQConnectionFactory” factory = createConnectionFactoryInstance(factoryClass);//设置关闭超时时间if (this. properties . getCloseTimeout() != null) {factory. setCloseTimeout((int) this . properties . getCloseTimeout() . toMillis());(0);设置发送超时时间F (this. properties . getSendTimeout() != null) {factory . setSendTimeout((int) this . properties . getSendTimeout() . toMillis();//设置信任范围Packages packages = this.properties . getPackages();if (packages . getTrustAll() != nu1l)factory . setTrustAllPackages (packages . getTrustAll());if (!packages .getTrusted(). isEmpty()) {factory . setTrustedPackages (packages . getTrusted());//定制化customize(factory);return factory;}

在上述代码中,主要是设置配置文件中指定的参数值(如:配置关闭超时时间、发送超时时间)和 定 制 化 操 作 ( 如 遍 历 调 用 构 造 对 象 时 传 入 的 参 数List中元素的 customize 方法)。

下面看基于缓存的 CachingConnectionFactory 的创建源码。

@Configuration(proxyBeanMethods = false)@ConditionalOnClass (CachingConnectionFactory. class)@ConditionalOnProperty(prefix = "spring. jms .cache", name = "enabled", havingValue= "true",matchIfMissing = true )static class CachingConnectionF actoryConfiguration {@BeanCachingConnectionFactory cachingJmsConnect ionFactory (JmsProperties jmsProperties,ActiveMQProperties Froperties,ObjectProvider factoryCusto-mizers) {JmsProperties . Cache cacheProperties = jmsProperties . getCache();CachingConnectionFactory connectionFactory = new CachingConnectionF actory(createJmsConnectionFactory(properties, factoryCustomizers));connectionFactory . setCacheConsumers(cacheProperties . isConsumers());connectionFactory. setCacheProducers(cacheProperties . isProducers());connectionFactory. setSessionCacheSize(cacheProperties . getSessionCacheSize());return connectionFactory;}}

注解部分表明,如果未配置 spring.jms.cache.enabled 或配置其值为 true,均会使该自动配置生效。创建 CachingConnectionFactory 的过程就是 new 一个 CachingConnectionFactory对象,其参数包含上面我们讲到的 ActiveMQConnectionFactory 对象(创建步骤完全一样)和配置属性。然后,根据配置参数设置是否缓存消费者、生产者和预期的缓存大小。

CachingConnectionFactory 是 SingleConnectionFactory 的子类,它添加了会话( Session)缓 存 以 及 MessageProducer 缓 存 。 默 认 情 况 下 , 此 ConnectionFactory 还 会 将“reconnect-OnException" 属 性 切 换 为 "true” , 从 而 允 许 发 生 异 常 时 自 动 恢 复 重 置Connection。

CachingConnectionFactory 默认情况下,只会缓存一个会话,其他进一步的回话请求会按照需要创建并处理。在高并发环境下,要考虑提高“sessionCacheSize'的值。

最后再看一下基于 池化的 ConnectionFactory 的创建。

@Configuration(proxyBeanMethods = false)@Conditional0nClass({ JmsPoolConnectionFactory . class, Pooled0bject. class })static class PooledConnectionF actoryConfiguration {@Bean(destroyMethod = "stop")@Conditiona lOnProperty(prefix = "spring . activemq .pool", name = "enabled" ,havingValue = "true")JmsPoolConnectionFactory pooledJmsConnectionF actory(ActiveMQProperties properties,ObjectProvider factoryCustomizers) {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnection-FactoryFactory(properties,factoryCustomizers . orderedStream(). collect(Collectors.toList())). createConnectionF actory(ActiveMQConnectionFactory.class);return new JmsPoolConnection actoryFactory(properties . getPool()). createPooledConnection actory( connectionFactory);}}

下 面 讲 解 JmsPoolConnectionFactory 的 创 建 步 骤 : 首 先 , 创 建 一 个ActiveMQConnec-tionFactory 对象,与上面创建该类的方法完全一样,不再赘述;随后,创建 JmsPoolConne-ctionFactoryFactory 对象, 构造参数为连接池的配置信息,然后调用对象的 createPooled-ConnectionFactory 方法,将 ActiveMQConnectionFactory 对象传入。

在 createPooledConnectionFactory 方 法 中 主 要 就 是 new 出 一 个JmsPoolConnectionFactory 对象设置 ConnectionFactory 为 ActiveMQConnectionFactory,并进行一些其他配置参数的判断和设置。这里就不再展示相关代码了。

至此,关于 ActiveMQ 自动配置的讲解已经完成。

本文给大家讲解的内容是SpringBoot消息源码解析:ActiveMQ自动配置

  1. 下篇文章给大家讲解的是@JmsL istener 注解解析;
  2. 觉得文章不错的朋友可以转发此文关注小编;
  3. 感谢大家的支持!
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐