Friday, May 28, 2021

spring cloud stream 3.1.2 源码搭配rocketmq学习 (二)

现在我们从源码来分析(一)中所涉及的东西


提问

问一下自己想从源码中知道什么, 带着目的去看源码才容易搞懂.

从下述的代码中发现定义了一个Function的Bean和在yaml中定义了definition, 那么这两个定义的作用是什么呢? Function是怎么样去绑定、注册的呢?

带着问题我们就可以去找对应的实现.

@Beanpublic Function<Flux<Message<String>>, Mono<Void>> consumerEvent() { return flux -> flux.map(message -> {  System.out.println(message.getPayload());  return message; }).then();}
spring: cloud: stream:  function:  definition: consumerEvent

怎么找springboot项目的启动

首先我们看META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfigura:\(...省略一部分)org.springframework.cloud.stream.function.FunctionConfiguration

发现自动装载了一个FunctionConfiguration的类

进到这个类里面看, 发现他注册了一个functionBindingRegistrar的Bean.

看英文---(functionBindingRegistrar) 方法绑定注册, 这好像是我们想知道的东西.

那么接着往下看


functionBindingRegistrar

看传入的参数发现这个Bean是根据StreamFunctionProperties注册的一个的Bean.
// FunctionConfiguration#functionBindingRegistrar@Beanpublic InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) { return new FunctionConfiguration.FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);}

因为这个Bean是InitializingBean所以直接看afterPropertiesSet这个方法

看源码最重要就是抓住主线, 从源码中发现有这样的一段代码.

// 注册了一个Bean定义, functionBindableProxyDefinition.registry.registerBeanDefinition(name, functionBindableProxyDefinition);

咦? 那functionBindableProxyDefinition是一个什么东西呢??

往上找这个的赋值.

// 看到这行, 他是一个 BindableFunctionProxyFactoryfunctionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);// 为构造参数进行了赋值functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(functionDefinition);functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.inputCount);functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.outputCount);functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);

初始化了一个RootBeanDefinition, 并对构造函数进行呢相对应的赋值, 那参数从哪来的呢, 继续看源码.

// 这些参数怎么来的呢// 看到下面这些代码, 对于Function/Supplier/Consumer怎么区分的, 是不是有的清晰的认知FunctionInvocationWrapper function = (FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);Type functionType = function.getFunctionType();if (function.isSupplier()) { this.inputCount = 0; this.outputCount = this.getOutputCount(functionType, true);} else if (function.isConsumer()) { this.inputCount = FunctionTypeUtils.getInputCount(functionType); this.outputCount = 0;} else { this.inputCount = FunctionTypeUtils.getInputCount(functionType); this.outputCount = this.getOutputCount(functionType, false);}

看到上面这一段代码inputCount/outCount是计算出来的, 对于怎么区分Function/Supplier/Consumer是Input还是Output也有了一定的了解

那functionDefinition是什么呢

functionDefinition = var3[var5];

我们发现他是这样赋值的, 再往上看

sourceNames = this.filterEligibleFunctionDefinitions();var3 = sourceNames;var4 = sourceNames.length;

通过阅读发现filterEligibleFunctionDefinitions这个方法里对我们配置文件中Definition进行了解析处理, 并返回了合格的sourcesNames.

哦! 原来Definition的意义在这.

到此这个bean的注册就已经完成了,那就来看看BindableFunctionProxyFactory,
发现他又是一个InitializingBean, 所以上诉代码设置完初始化参数后, 在spring实例化Bean之后会调用afterPropertiesSet方法

public void afterPropertiesSet() { Assert.notEmpty(this.bindingTargetFactories, "'bindingTargetFactories' cannot be empty"); int i; if (this.inputCount > 0) {  for(i = 0; i < this.inputCount; ++i) {   this.createInput(this.buildInputNameForIndex(i));  } } if (this.outputCount > 0) {  for(i = 0; i < this.outputCount; ++i) {   this.createOutput(this.buildOutputNameForIndex(i));  } }}

然后看到这两个函数. 发现绑定名称原来是这样的.

private String buildInputNameForIndex(int index) { return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "in" + "-" + index;}private String buildOutputNameForIndex(int index) { return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "out" + "-" + index;}

以Input为例子, 我们看看createInput的方法.

先从简单的来看, 先不看pollable

this.inputHolders.put(name, new BoundTargetHolder(this.getBindingTargetFactory(SubscribableChannel.class).createInput(name), true));

SubscribableChannel的createInput, 我们找到SubscribableChannelBindingTargetFactory#createInput的

public SubscribableChannel createInput(String name) { DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel(); subscribableChannel.setComponentName(name); subscribableChannel.setAttribute("type", "input"); this.messageChannelConfigurer.configureInputChannel(subscribableChannel, name); if (this.context != null && !this.context.containsBean(name)) {  this.context.registerBean(name, DirectWithAttributesChannel.class, () -> {   return subscribableChannel;  }, new BeanDefinitionCustomizer[0]); } return subscribableChannel;}

发现返回了DirectWithAttributesChannel的一个类, 并且把他注册成为了Bean.

后面把这个类封装在BoundTargetHolder中并放入inputHolders中就结束了Function注册的过程

总结

  1. 启动之后会注册一个FunctionBindingRegistrar的Bean, 在这个Bean中会读取配置文件找到对应的FunctionBean, 处理这个FunctionBean生成注册需要的参数并把这些内容构成一个functionBindableProxyDefinition的Bean.
  2. functionBindableProxyDefinition的Bean处理上述构造函数传入的参数并生成对应的Input/Output的Bean.

至此, funciton的注册就完成了吗. (不!). 其实还没有完成, 细心的朋友会发现 还有functionInitializer的Bean. 下一节我们来看看这个.

Wish. Do.









原文转载:http://www.shaoqun.com/a/764968.html

跨境电商:https://www.ikjzd.com/

徐家骏:https://www.ikjzd.com/w/1803

houzz:https://www.ikjzd.com/w/236


现在我们从源码来分析(一)中所涉及的东西提问问一下自己想从源码中知道什么,带着目的去看源码才容易搞懂.从下述的代码中发现定义了一个Function的Bean和在yaml中定义了definition,那么这两个定义的作用是什么呢?Function是怎么样去绑定、注册的呢?带着问题我们就可以去找对应的实现.@BeanpublicFunction<Flux<Message<String
hemingway:https://www.ikjzd.com/w/2344
敦煌网站:https://www.ikjzd.com/w/189
汇通天下物流:https://www.ikjzd.com/w/2055
天猫国际联合海关启动"仓内留样检":https://www.ikjzd.com/articles/18175
初阶卖家必读:3个步骤,让您的亚马逊广告运营事半功倍!:https://www.ikjzd.com/articles/143206
"2018亚马逊全球开店卖家峰会"现场火爆体验盘点:https://www.ikjzd.com/articles/12575

No comments:

Post a Comment