官方地址:

通过idea打开版本的源码

其实在之前的应用课程中,我们已经用过AT模式,同时也写过一个小的Demo,那么这里其实我们主要要分析的是AT模式官方文档中的一些内容
官方文档:
2.1写隔离一阶段本地事务提交前,需要确保先拿到「全局锁」。
拿不到「全局锁」,不能提交本地事务。
拿「全局锁」的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
图解:

如果tx1的二阶段全局回滚,则tx1需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果tx2仍在等待该数据的「全局锁」,同时持有本地锁,则tx1的分支回滚会失败。分支的回滚会一直重试,直到tx2的「全局锁」等锁超时,放弃「全局锁」并回滚本地事务释放本地锁,tx1的分支回滚最终成功。
因为整个过程「全局锁」在tx1结束前一直是被tx1持有的,所以不会发生「脏写」的问题。
2.2读隔离在数据库本地事务隔离级别「读已提交(ReadCommitted)」或以上的基础上,Seata(AT模式)的默认全局隔离级别是「读未提交(ReadUncommitted)」。
如果应用在特定场景下,必须要求全局的「读已提交」,目前Seata的方式是通过SELECTFORUPDATE语句的代理。
图解:

SELECTFORUPDATE语句的执行会申请「全局锁」,如果「全局锁」被其他事务持有,则释放本地锁(回滚SELECTFORUPDATE语句的本地执行)并重试。这个过程中,查询是被block住的,直到「全局锁」拿到,即读取的相关数据是「已提交」的,才返回。
出于总体性能上的考虑,Seata目前的方案并没有对所有SELECT语句都进行代理,仅针对FORUPDATE的SELECT语句。
2.3AT二阶段一阶段:
解析SQL:得到SQL的类型(UPDATE),表(product),条件(wherename='TXC')等相关的信息。
查询前镜像(改变之前的数据):根据解析得到的条件信息,生成查询语句,定位数据。
执行业务SQL:更新这条数据。
查询后镜像(改变后的数据):根据前镜像的结果,通过「主键」定位数据。
插入回滚日志:把前后镜像数据以及业务SQL相关的信息组成一条回滚日志记录,插入到UNDO_LOG表中。
提交前,向TC注册分支:申请「全局锁」。
本地事务提交:业务数据的更新和前面步骤中生成的UNDOLOG一并提交。
将本地事务提交的结果上报给TC。
二阶段-回滚:
收到TC的分支回滚请求,开启一个本地事务,执行如下操作。
通过XID和BranchID查找到相应的UNDOLOG记录。
根据UNDOLOG中的前镜像和业务SQL的相关信息生成并执行回滚的语句:
提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给TC。
二阶段-提交:
收到TC的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给TC。
异步任务阶段的分支提交请求将异步和批量地删除相应UNDOLOG记录。
二、Seata源码剖析1.Seata客户端启动首先一个Seata的客户端启动一般分为几个流程:
自动加载各种Bean及配置信息
初始化TM
初始化RM(具体服务)
初始化分布式事务客户端完成,代理数据源
连接TC(Seata服务端),注册RM,注册TM
开启全局事务
在这个其中,就会涉及到几个核心的类型,首先我们需要来看配置类型GlobalTransactionAutoConfiguration
所以我们直接通过官方案例引入的Seata包,找到SpringBoot项目在启动的时候自动扫描加载类型的,然后找到GlobalTransactionAutoConfiguration(Seata自动配置类)
2.全局事务扫描类源码这个类型的核心点,就是加载配置,注入相关的Bean
/***seata自动配置类*/@Configuration@EnableConfigurationProperties()publicclassGlobalTransactionAutoConfiguration{privatefinalApplicationContextapplicationContext;privatefinalSeataPropertiesseataProperties;publicGlobalTransactionAutoConfiguration(ApplicationContextapplicationContext,SeataPropertiesseataProperties){=applicationContext;=seataProperties;}//注入全局事务扫描器@BeanpublicGlobalTransactionScannerglobalTransactionScanner(){StringapplicationName=().getProperty("");StringtxServiceGroup=();if((txServiceGroup)){txServiceGroup=applicationName+"-fescar-service-group";(txServiceGroup);}//构建全局扫描器,传入参数:应用名、事务分组名,失败处理器returnnewGlobalTransactionScanner(applicationName,txServiceGroup);}}3.GlobalTransactionScanner全局事务扫描器在这其中我们要关心的是GlobalTransactionScanner这个类型,这个类型扫描@GlobalTransactional注解,并对代理方法进行拦截增强事务的功能。

这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,其实这个方法就是判断Bean对象是否需要代理,是否需要增强
protectedObjectwrapIfNecessary(Objectbean,StringbeanName,ObjectcacheKey){if(beanName!=(beanName)){returnbean;}if(((cacheKey))){returnbean;}if(isInfrastructureClass(())||shouldSkip((),beanName)){(cacheKey,);returnbean;}//[]specificInterceptors=getAdvicesAndAdvisorsForBean((),beanName,null);if(specificInterceptors!=DO_NOT_PROXY){(cacheKey,);Objectproxy=createProxy((),beanName,specificInterceptors,newSingletonTargetSource(bean));(cacheKey,());returnproxy;}(cacheKey,);returnbean;}当然这是父类提供的方法,那子类继承之后重写此方法,完成了定制化的效果
@OverrideprotectedObjectwrapIfNecessary(Objectbean,StringbeanName,ObjectcacheKey){try{//加锁防止并发synchronized(PROXYED_SET){if(PROXYED_(beanName)){returnbean;}interceptor=null;//checkTCCproxy//检查是否是TCC模式if((bean,beanName,applicationContext)){//TCCinterceptor,proxybeanofsofa:reference/dubbo:reference,andLocalTCC//如果是:添加TCC拦截器interceptor=newTccActionInterceptor((beanName));(_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);}else{//不是TCC模式Class?serviceInterface=(bean);Class?[]interfacesIfJdk=(bean);//判断是否有相关事务注解,如果没有就不代理if(!existsAnnotation(newClass[]{serviceInterface})!existsAnnotation(interfacesIfJdk)){returnbean;}//当发现存在全局事务注解标注的Bean,添加拦截器if(globalTransactionalInterceptor==null){//添加拦截器globalTransactionalInterceptor=newGlobalTransactionalInterceptor(failureHandlerHook);(_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor=globalTransactionalInterceptor;}("Bean[{}]withname[{}]woulduseinterceptor[{}]",().getName(),beanName,().getName());//检查是否是代理对象if(!(bean)){//不是调用Spring代理(父级)bean=(bean,beanName,cacheKey);}else{//已经是代理对象,反射获取代理类中的已经存在的拦截器组合,然后添加到该集合当中AdvisedSupportadvised=(bean);Advisor[]advisor=buildAdvisors(beanName,getAdvicesAndAdvisorsForBean(null,null,null));for(Advisoravr:advisor){(0,avr);}}//将Bean添加到Set中PROXYED_(beanName);returnbean;}}catch(Exceptionexx){thrownewRuntimeException(exx);}}图解地址:
三、Seata源码分析-2PC核心源码解读1.2PC提交源码流程上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些@GlobalTransactional注解的判断,如果有注解以后,会执行全局事务和本地事务,那么在执行全局事务的时候会调用handleGlobalTransaction全局事务处理器,这里主要是获取事务信息
ObjecthandleGlobalTransaction(finalMethodInvocationmethodInvocation,finalGlobalTransactionalglobalTrxAnno)throwsThrowable{booleansucceed=true;try{(newTransactionalExecutor(){@OverridepublicObjectexecute()throwsThrowable{();}//获取事务名称,默认获取方法名publicStringname(){Stringname=();if(!(name)){returnname;}returnformatMethod(());}/***解析GlobalTransactional注解属性,封装为对象*@return*/@OverridepublicTransactionInfogetTransactionInfo(){//resetthevalueoftimeout//获取超时时间,默认60秒inttimeout=();if(timeout=0||timeout==DEFAULT_GLOBAL_TRANSACTION_TIMEOUT){timeout=defaultGlobalTransactionTimeout;}//构建事务信息对象TransactionInfotransactionInfo=newTransactionInfo();(timeout);//超时时间(name());//事务名称(());//事务传播(());//校验或占用全局锁重试间隔(());//校验或占用全局锁重试次数SetRollbackRulerollbackRules=newLinkedHashSet();//其他构建信息for(Class?rbRule:()){(newRollbackRule(rbRule));}for(StringrbRule:()){(newRollbackRule(rbRule));}for(Class?rbRule:()){(newNoRollbackRule(rbRule));}for(StringrbRule:()){(newNoRollbackRule(rbRule));}(rollbackRules);returntransactionInfo;}});}catch(){//执行异常=();switch(code){caseRollbackDone:();caseBeginFailure:succeed=false;((),());();caseCommitFailure:succeed=false;((),());();caseRollbackFailure:((),());();caseRollbackRetrying:((),());();default:thrownewShouldNeverHappenException((":%s",code));}}finally{if(degradeCheck){EVENT_(newDegradeCheckEvent(succeed));}}}其实这个方法主要的作用就是,执行事务的流程,大概一下几点:
获取事务信息
开始执行全局事务
发生异常全局回滚,各个数据通过undo_log表进行事务补偿
全局事务提交
清除所有资源
这个位置是非常核心的一个位置,因为我们所有的业务进来以后都会走这个位置。
这其中的第三步和第四步就是在想TC(Seata-Server)发起全局事务的提交/回滚
publicObjectexecute(TransactionalExecutorbusiness)throwsThrowable{//1.GettransactionInfo//获取事务信息TransactionInfotxInfo=();if(txInfo==null){thrownewShouldNeverHappenException("transactionInfodoesnotexist");}//1.1Getcurrenttransaction,ifnotnull,thetxroleis''.//获取当前事务,主要获取XidGlobalTransactiontx=();//1.2Handlethetransactionpropagation.//根据配置的不同事务传播行为,执行不同的逻辑Propagationpropagation=();SuspedResourcesHoldersuspedResourcesHolder=null;try{switch(propagation){caseNOT_SUPPORTED://Iftransactionisexisting,(existingTransaction(tx)){suspedResourcesHolder=();}//();caseREQUIRES_NEW://Iftransactionisexisting,suspit,(existingTransaction(tx)){suspedResourcesHolder=();tx=();}//Continueandexecutewithnewtransactionbreak;caseSUPPORTS://Iftransactionisnotexisting,(notExistingTransaction(tx)){();}//Continueandexecutewithnewtransactionbreak;caseREQUIRED://Ifcurrenttransactionisexisting,executewithcurrenttransaction,//;caseNEVER://Iftransactionisexisting,(existingTransaction(tx)){thrownewTransactionException(("Existingtransactionfoundfortransactionmarkedwithpropagation'never',xid=%s",()));}else{//();}caseMANDATORY://Iftransactionisnotexisting,(notExistingTransaction(tx)){thrownewTransactionException("Noexistingtransactionfoundfortransactionmarkedwithpropagation'mandatory'");}//;default:thrownewTransactionException("NotSupportedPropagation:"+propagation);}//1.3Ifnull,createnewtransactionwithrole''.//当前没有事务,则创建一个新的事务if(tx==null){tx=();}//setcurrenttxconfigtoholderGlobalLockConfigpreviousConfig=replaceGlobalLockConfig(txInfo);try{//2.Ifthetxroleis'',stherequestofbeginTransactiontoTC,//,thehookswillstillbetriggered.//开始执行全局事务beginTransaction(txInfo,tx);Objectrs;try{//DoYourBusiness//执行当前业务逻辑://1.在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据//2.执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log表中//3.远程调用其他应用,远程应用接收到xid,也会注册分支事务,写入branch_table及本地undo_log表//4.会在lock_table表中插入全局锁数据(一个分支一条)rs=();}catch(Throwableex){//3.Theneededbusinessexceptiontorollback.//发生异常全局回滚,各个数据通过undo_log表进行事务补偿completeTransactionAfterThrowing(txInfo,tx,ex);throwex;}//4.everythingisfine,commit.//全局提交事务commitTransaction(tx);returnrs;}finally{//5.clear//清除所有资源resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}}finally{//Ifthetransactionissusped,(suspedResourcesHolder!=null){(suspedResourcesHolder);}}}2.如何发起全局事务这个位置我们就看当前这个代码中的beginTransaction(txInfo,tx);方法
//想TC发起请求,这里采用了模板模式privatevoidbeginTransaction(TransactionInfotxInfo,GlobalTransactiontx){try{triggerBeforeBegin();//对TC发起请求((),());triggerAfterBegin();}catch(TransactionExceptiontxe){(tx,txe,);}}那我们向下来看begin方法,那要注意,这里调用begin方法的是DefaultGlobalTransaction
@Overridepublicvoidbegin(inttimeout,Stringname)throwsTransactionException{//判断调用者是否是TMif(role!=){assertXIDNotNull();if(()){("IgnoreBegin():justinvolvedinglobaltransaction[{}]",xid);}return;}assertXIDNull();StringcurrentXid=();if(currentXid!=null){thrownewIllegalStateException("Globaltransactionalreadyexists,"+"can'tbeginanewglobaltransaction,currentXid="+currentXid);}//获取Xidxid=(null,null,name,timeout);status=;(xid);if(()){("Beginnewglobaltransaction[{}]",xid);}}在向下来看begin方法,这时候使用的是(默认事务管理者),来真正的获取xid,其中就是传入事务的相关信息,最终TC端返回对应的全局事务Xid。
@OverridepublicStringbegin(StringapplicationId,StringtransactionServiceGroup,Stringname,inttimeout)throwsTransactionException{GlobalBeginRequestrequest=newGlobalBeginRequest();(name);(timeout);//发送请求得到响应GlobalBeginResponseresponse=(GlobalBeginResponse)syncCall(request);if(()==){thrownewTmTransactionException(,());}//返回();}这里采用的是Netty的通讯方式
privateAbstractTransactionResponsesyncCall(AbstractTransactionRequestrequest)throwsTransactionException{try{//通过Netty发送请求return(AbstractTransactionResponse)().sSyncRequest(request);}catch(TimeoutExceptiontoe){thrownewTmTransactionException(,"RPCtimeout",toe);}}四、Seata源码分析-数据源代理上节课我们分析了整体的Seata-AT模式的2PC执行流程,那么这节课我们要分析的就是在AT模式中的关键点,数据源代理
1.AT模式的核心点:获取全局锁、开启全局事务
解析SQL并写入undolog
那么上节课其实我们已经把第一步分析清楚了,那么这节课我们就要分析的是AT模式如何解析SQL并写入undolog,首先我们要先明确实际上Seata其中采用了数据源代理的模式。
那么这个就需要我们在回顾一下GlobalTransactionScanner这个类型,在这个类型中继承了一些的接口和抽象类,比较关键的几个:
AbstractAutoProxyCreator
InitializingBean
ApplicationContextAware
DisposableBean
这里给大家回顾一下:
继承ApplicationContextAware类型以后,需要实现对应的方法:voidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException当spring启动完成后,会自动调用这个类型,把ApplicationContext给bean。也就是说,GlobalTransactionScanner天然能拿到Spring的环境。
继承了InitializingBean接口,需要实现一个方法:voidafterPropertiesSet()throwsException;凡是继承该接口的类,在初始化bean的时候,当所有properties都设置完成后,会执行该方法。
继承DisposableBean,需要实现一个方法:voiddestroy()throwsException;和InitializingBean接口相反,这个是在销毁的时候会调用这个方法。
AbstractAutoProxyCreator就比较复杂了,它Spring实现AOP的一种方式。本质上是一个BeanPostProcessor,他在bean初始化之前,调用内部的createProxy方法,创建一个bean的AOP代理bean并返回,对Bean的增强。
总结一下:总体的逻辑就是,GlobalTransactionScanner扫描有注解的bean,做AOP增强。
2.数据源代理关于数据源代理这里我们
全局事务拦截成功后最终还是执行了业务方法的,但是由于Seata对数据源做了代理,所以sql解析与undolog入库操作是在数据源代理中执行的,箭头处的代理就是Seata对DataSource,Connection,Statement做的代理封装类

数据源代理是非常重要的一个环节。我们知道,在分布式事务运行过程中,undolog等的记录、资源的锁定等,都是用户无感知的,因为这些操作都在数据源的代理中完成了。
3.数据源代理DataSourceProxyDataSourceProxy的主要功能为,它在构造方法中调用了一个自定义的init方法,主要做了以下能力的增强:
为每个数据源标识了资源组ID
如果配置打开,会有一个定时线程池定时更新表的元数据信息并缓存到本地
生成代理连接ConnectionProxy
那我们先来看init方法
privatevoidinit(DataSourcedataSource,StringresourceGroupId){//资源组ID,默认是“default”这个默认值=resourceGroupId;try(Connectionconnection=()){//根据原始数据源得到JDBC连接和数据库类型jdbcUrl=().getURL();dbType=(jdbcUrl);if((dbType)){userName=().getUserName();}}catch(SQLExceptione){thrownewIllegalStateException("cannotinitdataSource",e);}().registerResource(this);if(ENABLE_TABLE_META_CHECKER_ENABLE){//如果配置开关打开,会定时线程池不断更新表的元数据信息/***每分钟查询一次数据源的表结构信息并缓存,在需要查询数据库结构时会用到,不然每次去数据库查询结构效率会很低。*/(()-{try(Connectionconnection=()){(()).refresh(connection,());}catch(Exceptionignore){}},0,TABLE_META_CHECKER_INTERVAL,);}//Setthedefaultbranchtypeto'AT'(());}这3个增强里面,前两个都比较容易理解,第三是最重要的。我们知道在AT模式里面,会自动记录undolog、资源锁定等等,都是通过ConnectionProxy完成的。
另外,DataSourceProxy重写了几个方法。
重点是getConnection,此时会返回一个ConnectionProxy,而不是原生的Connection
@OverridepublicConnectionProxygetConnection()throwsSQLException{ConnectiontargetConnection=();returnnewConnectionProxy(this,targetConnection);}@OverridepublicConnectionProxygetConnection(Stringusername,Stringpassword)throwsSQLException{ConnectiontargetConnection=(username,password);returnnewConnectionProxy(this,targetConnection);}4.ConnectionProxy分析ConnectionProxy继承了AbstractConnectionProxy。一看到Abstract,就知道它的父类封装了很多通用工作。它的父类里面还使用了PreparedStatementProxy、StatementProxy、DataSourceProxy。

我们先来分析AbstractConnectionProxy
5.AbstractConnectionProxy@OverridepublicStatementcreateStatement()throwsSQLException{//调用真实连接对象获得Statement对象StatementtargetStatement=getTargetConnection().createStatement();//创建Statement的代理returnnewStatementProxy(this,targetStatement);}@OverridepublicPreparedStatementprepareStatement(Stringsql)throwsSQLException{//数据库类型,比如mysql、oracle等StringdbType=getDbType();//+PreparedStatementtargetPreparedStatement=null;//如果是AT模式且开启全局事务,那么就会进入if分支if(==()){ListSQLRecognizersqlRecognizers=(sql,dbType);if(sqlRecognizers!=()==1){SQLRecognizersqlRecognizer=(0);if(sqlRecognizer!=()==){//得到表的元数据TableMetatableMeta=(dbType).getTableMeta(getTargetConnection(),(),getDataSourceProxy().getResourceId());//得到表的主键列名String[]pkNameArray=newString[().size()];().toArray(pkNameArray);targetPreparedStatement=getTargetConnection().prepareStatement(sql,pkNameArray);}}}if(targetPreparedStatement==null){targetPreparedStatement=getTargetConnection().prepareStatement(sql);}//创建PreparedStatementProxy代理returnnewPreparedStatementProxy(this,targetPreparedStatement,sql);}6.分布式事务SQL执行在这两个代理对象中,执行SQL语句的关键方法如下:
@OverridepublicResultSetexecuteQuery(Stringsql)throwsSQLException{=sql;(this,(statement,args)-((String)args[0]),sql);}@OverridepublicintexecuteUpdate(Stringsql)throwsSQLException{=sql;(this,(statement,args)-((String)args[0]),sql);}@Overridepublicbooleanexecute(Stringsql)throwsSQLException{=sql;(this,(statement,args)-((String)args[0]),sql);}其他执行SQL语句的方法与上面三个方法都是类似的,都是调用方法,未完待续.