1

Seata 1.5.2 源码学习 - 废物大师兄

 1 year ago
source link: https://www.cnblogs.com/cjsblog/p/16866796.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Seata 1.5.2 源码学习

文章有点长,我决定用半个小时来和你分享~😂 废话不多说,上代码。。。

874963-20221107171907664-441059075.png

基于Seata 1.5.2,项目中用 seata-spring-boot-starter

1. SeataDataSourceAutoConfiguration

874963-20221107171840926-1361420588.png

SeataDataSourceAutoConfiguration 主要是配置数据源代理,可以看到:

  1. 默认seata.enabled、seata.enableAutoDataSourceProxy、seata.enable-auto-data-source-proxy都是true
  2. 只有当classpath中有DataSource时才会进行此配置
  3. 创建了一个SeataAutoDataSourceProxyCreator,用于自动代理数据源
874963-20221107172628110-926685738.png

先记一下,SeataAutoDataSourceProxyCreator是一个BeanPostProcessor

刚才new了一个SeataAutoDataSourceProxyCreator,继续看构造方法,默认useJdkProxy是false,excludes为空,dataSourceProxyMode是AT

874963-20221107175359347-1977222225.png

构造方法中最重要的一件事情是构造AOP通知(拦截器),这里new了一个SeataAutoDataSourceProxyAdvice

874963-20221107180719002-959863460.png

SeataAutoDataSourceProxyAdvice是一个MethodInterceptor。

MethodInterceptor是aop中的一个接口,当目标方法被调用时就会调用与之关联的MethodInterceptor的invoke方法

874963-20221107181438892-1131873033.png

至此,在构造方法中完成了advisors的赋值,advisors[]中有一个DefaultIntroductionAdvisor,DefaultIntroductionAdvisor中引用了SeataAutoDataSourceProxyAdvice

前面说过,SeataAutoDataSourceProxyCreator是一个BeanPostProcessor,而BeanPostProcessor是BeanFactory中的一个钩子(回调),称之为后置处理器

874963-20221107182724826-1863689240.png

AbstractAutoProxyCreator#postProcessBeforeInstantiation()

874963-20221108112107659-2051647751.png

AbstractAutoProxyCreator#postProcessAfterInitialization()

874963-20221108101721346-1844429461.png

AbstractAutoProxyCreator#wrapIfNecessary()

874963-20221108103117730-1463722344.png

AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()

874963-20221108104132547-1628430548.png

SeataAutoDataSourceProxyCreator#getAdvicesAndAdvisorsForBean()

874963-20221108104814084-1771654288.png

SeataAutoDataSourceProxyCreator#wrapIfNecessary()

874963-20221108101731889-492192588.png

DataSourceProxyHolder维护了数据源对象与数据源代理对象的映射

874963-20221108113605353-1249002755.png

至此,数据源代理部分就看完了,下面总结一下:

1、启动的时候自动配置数据源代理,创建了一个SeataAutoDataSourceProxyCreator

2、SeataAutoDataSourceProxyCreator在构造方法中创建AOP通知,并赋值给其属性

3、AbstractAutoProxyCreator是一个抽象类不能被实例化,能实例化的只有SeataAutoDataSourceProxyCreator

4、SeataAutoDataSourceProxyCreator从AbstractAutoProxyCreator那里继承了很多属性和方法,其中就包括postProcessBeforeInstantiation()、postProcessAfterInitialization()、createProxy()等等

5、SeataAutoDataSourceProxyCreator间接实现了BeanPostProcessor接口,也就是说它也是BeanPostProcessor的一个实现类

6、BeanFactory回调所有的BeanPostProcessor#postProcessAfterInitialization()时,就会调用SeataAutoDataSourceProxyCreator的postProcessAfterInitialization()方法,最终会调用wrapIfNecessary()方法

7、wrapIfNecessary()只关心DataSource对象,它负责为DataSource对象生成代理对象,并且在SeataAutoDataSourceProxyCreator中维护了DataSource对象与SeataDataSourceProxy对象之间的映射关心

8、创建代理对象时,会给DataSource对象应用AOP拦截器。用AOP的话来讲,就是给目标对象DataSource织入通知,并创建一个被增强的代理对象

9、通知(拦截器)是SeataAutoDataSourceProxyAdvice,它实现了MethodInterceptor接口

10、SeataAutoDataSourceProxyAdvice#invoke()方法所做的事情就是,拿到原始DataSource的代理对象,并且在代理对象上调用目标方法

综上所述,以上做的所有工作都是为了将来调用 javax.sql.DataSource 上的任意方法时都会被拦截,然后调用其代理对象上对应的方法。而DataSource中最重要的一个方法就是getConnection()

划重点:将来,所有调用 javax.sql.DataSource#getConnection() 都会被拦截,然后在代理对象上执行getConnection(),因此可以这样说

调 javax.sql.DataSource#getConnection() 实际上执行的是 io.seata.rm.datasource.SeataDataSourceProxy#getConnection()

2. SeataAutoConfiguration

SeataAutoConfiguration里面主要是配置GlobalTransactionScanner(全局事务扫描器)

seata.enabled=true 才会开启 SeataAutoConfiguration

874963-20221108144057109-1707155121.png
874963-20221108144219512-2002776459.png

874963-20221108141501369-1891395501.png

874963-20221108144313626-1070422826.png

GlobalTransactionScanner 也继承自 AbstractAutoProxyCreator,同时还实现了InitializingBean接口。BeanFactory在设置了所有bean属性之后会调用InitializingBean的afterPropertiesSet()方法

GlobalTransactionScanner#afterPropertiesSet()

874963-20221108142413614-441205471.png
874963-20221108142820973-69441343.png

io.seata.common.DefaultValues中定义了很多默认值

874963-20221108143124457-1970671247.png

同样地,因为实现了BeanPostProcessor接口,所以在启动时BeanFactory实例化Bean之后,会调用GlobalTransactionScanner的postProcessAfterInitialization(),尽管这个postProcessAfterInitialization()方法时从AbstractAutoProxyCreator那里继承来的,但是不影响啊,还是会调用GlobalTransactionScanner这个bean的postProcessAfterInitialization()方法。于是,最终又会调wrapIfNecessary()方法。

GlobalTransactionScanner#wrapIfNecessary()

874963-20221108162523049-1771318525.png

这里面有一个很重要的逻辑就是,创建了一个GlobalTransactionalInterceptor对象,并赋值给interceptor

AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()是一个抽象方法,实现在子类GlobalTransactionScanner中

874963-20221108162936236-1644367648.png
874963-20221108163036709-1892250316.png
874963-20221108163437845-946275007.png

因此,所有在GlobalTransactionScanner#wrapIfNecessary()中被代理的对象,都被应用GlobalTransactionalInterceptor

GlobalTransactionalInterceptor也是一个MethodInterceptor

也就是说,目标方法的调用都会转到GlobalTransactionalInterceptor#invoke()上

874963-20221108170227390-633320580.png

GlobalTransactionalInterceptor#handleGlobalTransaction()

874963-20221108171226128-1252076327.png

事务执行直接调用TransactionalTemplate的execute()方法

io.seata.tm.api.TransactionalTemplate#execute()

874963-20221108174300762-1962189263.png

io.seata.tm.api.GlobalTransactionContext#getCurrent() 获取当前事务

874963-20221108174507238-1965200698.png

io.seata.tm.api.TransactionalTemplate#beginTransaction()

874963-20221108174933239-796752092.png

tx是DefaultGlobalTransaction

io.seata.tm.api.DefaultGlobalTransaction#begin()

DefaultGlobalTransaction中的TransactionManager是DefaultTransactionManager

874963-20221108175408718-1005950442.png

DefaultTransactionManager中提供了事务相关的底层操作

874963-20221108180416158-642873715.png

io.seata.tm.api.DefaultGlobalTransaction#commit()

874963-20221108180848374-1038720566.png

io.seata.tm.api.DefaultGlobalTransaction#rollback()的逻辑与commit()类似,都是重试调用transactionManager.rollback(xid)

全局事务扫描器部分的代码就看到这里,下面总结一下:

1、配置项seata.enabled=true 会触发 SeataAutoConfiguration 自动配置

2、SeataAutoConfiguration中创建了一个GlobalTransactionScanner

3、GlobalTransactionScanner继承了AbstractAutoProxyCreator,并实现InitializingBean接口

4、初始化TM、RM

5、由于继承了AbstractAutoProxyCreator,所以BeanFactory会调用GlobalTransactionScanner#方法postProcessAfterInitialization(),最终会调用GlobalTransactionScanner#wrapIfNecessary()来为目标对象创建代理对象

6、GlobalTransactionScanner#wrapIfNecessary()中创建了一个GlobalTransactionalInterceptor,GlobalTransactionalInterceptor是一个MethodInterceptor

7、在创建代理对象的时候,在AbstractAutoProxyCreator#wrapIfNecessary()方法中,为代理对象应用GlobalTransactionalInterceptor,于是所有目标对象上的方法调用就会转为调用GlobalTransactionalInterceptor#invoke()

8、GlobalTransactionalInterceptor#invoke()方法中,首先获取被调用的目标对象的Class和Method对象,然后检查目标方法或类上是否有@GlobalTransactional或@GlobalLock注解,而且配置项中不能禁用全局事务

9、如果加了@GlobalTransactional注解,则创建一个AspectTransactional,然后开始处理全局事务,默认传播特性是REQUIRED

10、如果加了@GlobalLock注解,则开始处理全局锁

11、处理全局事务就是直接调用事务模板中的execute方法,TransactionalTemplate#execute()是一个模板方法,其中定义了事务处理的流程。首先开启事务,然后执行业务逻辑,最后提交事务,异常回滚事务。

12、事务操作是在DefaultGlobalTransaction中处理的,最终处理在DefaultTransactionManager。DefaultTransactionManager负责同步远程调用,向TC发请求来开启、提交、回滚事务等操作

874963-20221108185229605-402816916.png

3. 数据库操作执行SQL语句

通过Java自带的JDBC操作数据库通常是这样的:

Class.forName(driverClass);
// 获取Connection
Connection connection = DriverManager.getConnection(url,user,password);
// 创建Statement或者PreparedStatement
Statement stmt = connection.createStatement();
stmt.execute(sql);

// PreparedStatement ps = connection.prepareStatement(sql);
// ps.execute();

MyBatis底层也是这一套

接下来看Seata是如何做的

首先是获取数据库连接Connection,前面已经说过了,调用DataSource的getConnection()方法底层是在代理对象SeataDataSourceProxy上调用getConnection()。SeataDataSourceProxy是接口,如果是AT模式,则这个数据源代理对象是DataSourceProxy

874963-20221109110844980-1838961946.png
874963-20221109111156350-1393397445.png

DataSourceProxy#getConnection()获取数据库连接

874963-20221109111703898-678048951.png
874963-20221109111901428-205267395.png

ConnectionProxy#createStatement()

874963-20221109112444597-1024823.png
874963-20221109112626940-358549685.png
874963-20221109112752094-1343758169.png
874963-20221109112853646-218547298.png

ConnectionProxy#prepareStatement()

874963-20221109113023330-2098681247.png
874963-20221109113159042-109228636.png

PreparedStatementProxy 继承自 StatementProxy,因此下面就直接看PreparedStatementProxy如何执行SQL

PreparedStatementProxy#execute()

874963-20221109114150721-1866370946.png

ExecuteTemplate#execute()  是一个模板方法

874963-20221109115603531-971728161.png

挑一个看看吧,就挑UpdateExecutor

874963-20221109115828447-245939565.png
874963-20221109115948591-301533812.png
874963-20221109120142868-1383066187.png

UpdateExecutor构造方法中一直调父类的构造法,既然如此,那么直接看BaseTransactionalExecutor

874963-20221109120236766-1593801622.png

UpdateExecutor#execute()

这个方法时从BaseTransactionalExecutor那里继承来的,又是一个模板方法,可见设计模式是多么重要

874963-20221109120806284-78839760.png

AbstractDMLBaseExecutor#doExecute()

874963-20221109121658988-616343043.png

AbstractDMLBaseExecutor#executeAutoCommitTrue()

874963-20221109143705552-1362209246.png

ConnectionProxy#changeAutoCommit()

874963-20221109154515729-1559986124.png

现在事务自动提交已经被Seata改成false了

874963-20221109151229123-1861551283.png

UpdateExecutor#beforeImage()

874963-20221109150453822-1590066217.png

BaseTransactionalExecutor#prepareUndoLog()

874963-20221109152235456-1978807148.png

接下来,提交事务

ConnectionProxy#commit()

874963-20221109154627260-174126513.png

ConnectionProxy#processGlobalTransactionCommit() 处理全局事务提交

874963-20221109162235869-1577977412.png

分支事务提交以后,业务数据更改和undo_log就都提交了

回想一下,为什么在执行业务修改前要先将默认的自动提交改成手动提交,最后再改成自动提交呢?

因为,要将业务数据修改和插入undo_log放在同一个事务里,一起提交

这一切都归功于代理

回顾一下整个调用链

874963-20221109173430529-1001698251.png

结合之前的案例,AT模式TC、TM、RM三者的交互应该是这样的:

874963-20221109184652811-515068922.png

问题一:为什么在执行的时候,先将数据库自动提交autoCommit设为false,最后再改成true呢?

答:因为,需要将undo_log和业务数据修改放到同一个事务中,这样可以保证业务数据修改成功后undo_log必然插入成功,所以Seata要将其改为手动提交。最后再改成true是因为默认autoCommit就是true,这样可以不影响其它业务。

问题二:什么情况下ConnectionContext中xid=null,且isGlobalLockRequire=true呢?或者换一种问法,什么情况下不在全局事务中,当仍然需要全局锁呢?

答:当业务方法上不加@GlobalTransactional,而是只加了@GlobalLock注解的情况下,就会出现上述情况,也就会执行 ConnectionProxy#processLocalCommitWithGlobalLocks()方法,在事务提交前检查全局锁,这样设计的目的是在AT模式下,不出现脏读、脏写。由于数据源被代理了,当一个加了@GlobalTransactional的全局事务,与另一个加了@GlobalTransactional或@GlobalLock注解的事务在本地事务提交前就会检查全局锁,要先获得全局锁才能提交本地事务,这样就避免了脏读脏写,从而相当于实现了全局事务的读已提交隔离级别。参见:https://seata.io/zh-cn/blog/seata-at-lock.html

关于Seata 1.5.2 Client端的源码学习就先到这里,欢迎交流~

如果你都已经看到了这里,不妨给我点个赞吧😄

感谢您的阅读,如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!
欢迎各位转载,但必须在文章页面中给出作者和原文链接!

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK