6

TCC-Transaction源码解析之事务执行

 3 years ago
source link: http://wuwenliang.net/2019/09/09/TCC-Transaction%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E4%B9%8B%E4%BA%8B%E5%8A%A1%E6%89%A7%E8%A1%8C/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
TCC-Transaction源码解析之事务执行 | 朝·闻·道

TCC分布式事务解决方案在开源界的主要实现为Byte-TCC、TCC-Transaction等。其中笔者了解较多并且业界使用率较高的为TCC-Transaction这一实现。

本文,我将带领读者对TCC-Transaction这一分布式事务框架进行一次源码解析,提高自己的阅读源码的能力,也希望能够对读者深入了解TCC-Transaction有所帮助。

源码地址为 https://github.com/changmingxie/tcc-transaction,我们关注最新版本1.2.x。

源码下载后导入IDEA中,项目目录结构如下图:

代码结构

代码结构

模块及其对应职责说明如下:

tcc-transaction
    |-transaction-tcc-api                   框架API定义,公共类/核心实体定义/枚举/工具类等
    |-transaction-tcc-core                  框架核心逻辑
    |-transaction-tcc-dubbo                 框架整合Dubbo实现
    |-transaction-tcc-spring                框架Spring整合,包含获取数据库连接/切面获取等
    |-transaction-tcc-server                后台管理页面,对事务进行手工重试等
    |-transaction-tcc-unit-test             单元测试
    |-transaction-tcc-tutorial-sample       样例工程
        |-tcc-transaction-dubbo-sample
        |-tcc-transaction-http-sample
        |-tcc-transaction-sample-domain
        |-tcc-transaction-server-sample

项目核心模块为 tcc-transaction-core,它实现了TCC核心业务逻辑,也是本次源码解析的重点对象。

我们从Dubbo使用样例入手进行分析,关于如何使用TCC-Transaction的更多说明,请参照官方文档: 使用指南1.2.x

从一个简单的样例入手

我们从一个调用案例入手开始进行分析,样例路径为org.mengyun.tcctransaction.sample.dubbo.order.service.PaymentServiceImpl。

@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", 
asyncConfirm = false, 
delayCancelExceptions = {SocketTimeoutException.class, com.alibaba.dubbo.remoting.TimeoutException.class})
public void makePayment(@UniqueIdentity String orderNo, Order order,
                 BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
    System.out.println("order try make payment called.time seq:" + 
                DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

    //check if the order status is DRAFT, if no, means that another call makePayment 
                for the same order happened, ignore this call makePayment.
    if (order.getStatus().equals("DRAFT")) {
        order.pay(redPacketPayAmount, capitalPayAmount);
        try {
            orderRepository.updateOrder(order);
        } catch (OptimisticLockingFailureException e) {
            //ignore the concurrently update order exception, ensure idempotency.
        }
    }

    String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order));
    String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order));
}
...省略confirmMakePayment实现...
...省略cancelMakePayment实现...

这段代码为模拟支付扣款操作,可以看到在方法上添加了@Compensable注解,它是TCC-Transaction框架的核心注解,作用为:开启tcc事务支持,注解可以设置一下参数

参数名 描述 propagation 事务传播属性,REQUIRED(必须存在事务,不存在则进行创建),SUPPORTS(如果有事务则在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在是否都创建新的事务) confirmMethod confirm阶段方法实现 cancelMethod cancel阶段方法实现 transactionContextEditor 设置transactionContextEditor asyncConfirm 是否使用异步confirm asyncCancel 是否使用异步cancel

解析注解@Compensable

看到了@Compensable注解以及对应的confirm、cancle方法,处于技术敏感,我们可以猜测在框架中一定存在切面逻辑对@Compensable进行拦截并处理;在切面逻辑中一定有对confirm、cancel方法的调用。从这个猜想出发,我们通过阅读相关代码去验证自己的猜想。

我们进入tcc-transaction-core模块的代码目录,目录结构如下:

org.mengyun.tcctransaction
    |-common
    |-context
    |-interceptor           TCC事务拦截器
    |-recover               TCC事务补偿
    |-repository            事务存储
    |-serializer
    |-support
    |-utils

我们主要关注interceptor目录,该目录下的interceptor实现了对注解@Compensable的解析以及对事务的代理逻辑。

CompensableTransactionAspect

CompensableTransactionAspect切面主要实现了对@Compensable的解析以及对事务的代理。

@Aspect
public abstract class CompensableTransactionAspect {

    private CompensableTransactionInterceptor compensableTransactionInterceptor;

    public void setCompensableTransactionInterceptor(
                CompensableTransactionInterceptor compensableTransactionInterceptor) {
        this.compensableTransactionInterceptor = compensableTransactionInterceptor;
    }

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {

    }

    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }

    public abstract int getOrder();
}

CompensableTransactionAspect的实现类为ConfigurableTransactionAspect.java, 加载顺序order= Ordered.HIGHEST_PRECEDENCE(-2147483648)。

该切面对标注了@Compensable的方法进行拦截,通过@Around为业务方法添加环绕增强。可以看到具体的增强方法实现为CompensableTransactionInterceptor.interceptCompensableMethod(pjp);

CompensableTransactionInterceptor.interceptCompensableMethod(pjp);

接着上述的分析,我们看一下CompensableTransactionInterceptor.interceptCompensableMethod(pjp)的逻辑。

[CompensableTransactionInterceptor.java]
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

    // 初始化一个TCC方法执行上下文
    CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp);
    // 校验事务支持是否开启
    boolean isTransactionActive = transactionManager.isTransactionActive();
    // 校验事务隔离级别
    if (!TransactionUtils.isLegalTransactionContext(
                isTransactionActive, compensableMethodContext)) {
        throw new SystemException
            ("no active compensable transaction while propagation is mandatory for method " 
                    + compensableMethodContext.getMethod().getName());
    }
    // 根据事务方法类型判断执行哪个逻辑
    switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
        case ROOT:
            return rootMethodProceed(compensableMethodContext);
        case PROVIDER:
            return providerMethodProceed(compensableMethodContext);
        default:
            return pjp.proceed();
    }
}

我们主要关注switch代码段

switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
    case ROOT:
        //处理主事务切面,即:本次事务的入口方法
        return rootMethodProceed(compensableMethodContext);
    case PROVIDER:
        //处理提供者事务切面
        return providerMethodProceed(compensableMethodContext);
    default:
        //消费者事务直接执行,会对应执行远端提供者事务切面
        return pjp.proceed();
}

当事务方法为ROOT方法(即分布式事务的主方法)时,执行rootMethodProceed(compensableMethodContext);方法为PROVIDER(提供者)方法时,执行providerMethodProceed(compensableMethodContext)。默认为消费者事务,则直接执行。

我们以此看一下这几种事务切面的执行逻辑。

rootMethodProceed(compensableMethodContext)

对于事务的Root方法,执行rootMethodProceed逻辑,代码逻辑:

private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) 
    throws Throwable {

    Object returnValue = null;

    Transaction transaction = null;

    boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();

    boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

    Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>();
    allDelayCancelExceptions.addAll(this.delayCancelExceptions);
    allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions()));

    try {
        // 创建事务, 将主事务的信息写入db或者zk或者redis中去,事务信息写入具体方式可配置
        transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());

        try {
            // 执行完成之后会马上进到另外一个切面中去
            returnValue = compensableMethodContext.proceed();
        } catch (Throwable tryingException) {
            // 如果try失败,则进行回滚
            if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) {

                logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
                // 回滚事务
                transactionManager.rollback(asyncCancel);
            }

            throw tryingException;
        }
        // 提交事务
        transactionManager.commit(asyncConfirm);

    } finally {
        // 最终如果执行成功,则删除之前的事务记录;如果执行失败则不作任何处理,等待job进行补偿操作
        transactionManager.cleanAfterCompletion(transaction);
    }
    return returnValue;
}

注意关注这段代码

// 执行完成之后会马上进到另外一个切面中去
returnValue = compensableMethodContext.proceed();

当所有的切面都执行完成之后才会执行后续的逻辑,也就是真正执行业务方法。

该方法为一个典型的模板方法,对事务通过begin、commit、rollback进行了抽象。

我们进入三个方法详细的分析。

begin()

首先进入begin方法

[TransactionManager.java]
public Transaction begin(Object uniqueIdentify) {
    // 0
    Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT);
    // 1
    transactionRepository.create(transaction);
    // 2
    registerTransaction(transaction);
    return transaction;
}

0.首先声明并初始化一个分布式事务对象Transaction,标记为ROOT事务,事务初始状态为TRYING。这里采用了经典的状态机策略

public Transaction(TransactionType transactionType) {
    this.xid = new TransactionXid();
    // 事务初始状态设置成TRYING
    this.status = TransactionStatus.TRYING;
    this.transactionType = transactionType;
}   

1.将事务信息存储到数据源中,数据源可以是数据库、redis、zk等,可配置;TransactionRepository是具体的持久化策略的抽象

2.注册事务,在TransactionManager中,通过双向队列(Deque)实现事务栈功能,用来处理嵌套事务。通过对Deque声明为为ThreadLocal,所以对每个线程而言,事务栈都都是独立的

private static final ThreadLocal> CURRENT = new ThreadLocal>();

commit()

接着看一下commit()方法

[TransactionManager.java]
public void commit(boolean asyncCommit) {

    // 从ThreadLocal中获取当前事务
    final Transaction transaction = getCurrentTransaction();
    // 设置事务状态为CONFIRMING
    transaction.changeStatus(TransactionStatus.CONFIRMING);
    // 更新存储中的事务信息
    transactionRepository.update(transaction);

    // 如果异步commit属性为true
    if (asyncCommit) {
        try {
            Long statTime = System.currentTimeMillis();
            // 通过本地线程池异步进行事务提交
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    commitTransaction(transaction);
                }
            });
            logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
        } catch (Throwable commitException) {
            logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
            throw new ConfirmingException(commitException);
        }
    } else {
        // 否则同步进行事务提交
        commitTransaction(transaction);
    }
}

commit(boolean asyncCommit)方法执行事务的提交过程,具体提交逻辑在commitTransaction(transaction)中完成。

[TransactionManager.java]
private void commitTransaction(Transaction transaction) {
    try {
        // 提交事务
        transaction.commit();
        // 删除本次提交的本地事务记录,如果commit异常,不会把数据库内事务记录删除,
        // 通过job重试进行补偿
        transactionRepository.delete(transaction);
    } catch (Throwable commitException) {
        logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
        throw new ConfirmingException(commitException);
    }
}

[Transaction.java]
public void commit() {
    // 对每一个分支执行提交操作
    for (Participant participant : participants) {
        participant.commit();
    }
}

可以看到,在事务提交完成之后,对本地持久化的事务记录进行了物理删除,具体删除方式取决于持久化机制。感兴趣的同学可以自行查看 org.mengyun.tcctransaction.repository 目录下的实现。

rollback()

我们看一下方法rollback()是如何实现事务回滚逻辑的

[TransactionManager.java]
public void rollback(boolean asyncRollback) {

    // 从ThreadLocal中获取当前事务    
    final Transaction transaction = getCurrentTransaction();
    transaction.changeStatus(TransactionStatus.CANCELLING);
    // 更新事务状态为CANCELLING
    transactionRepository.update(transaction);
    // 如果异步rollback属性为true
    if (asyncRollback) {
        try {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    // 通过线程池执行回滚逻辑
                    rollbackTransaction(transaction);
                }
            });
        } catch (Throwable rollbackException) {
            logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
            throw new CancellingException(rollbackException);
        }
    } else {
        // 异步rollback设置为false,同步执行回滚
        rollbackTransaction(transaction);
    }
}

和commit方法类似,在rollback(boolean asyncRollback)执行事务的回滚操作,具体的操作在rollbackTransaction(transaction)中执行:

private void rollbackTransaction(Transaction transaction) {
    try {
        // 事务回滚
        transaction.rollback();
        // 删除本次回滚的本地事务记录,如果rollback异常,不会把数据库内事务记录删除,
        // 通过job重试进行补偿
        transactionRepository.delete(transaction);
    } catch (Throwable rollbackException) {
        logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
        throw new CancellingException(rollbackException);
    }
}
cleanAfterCompletion(transaction)

无论是否提交/回滚,最终都会执行cleanAfterCompletion(transaction)方法进行现场清理操作。

public void cleanAfterCompletion(Transaction transaction) {
    if (isTransactionActive() && transaction != null) {
        // 从ThreadLocal中获取当前事务
        Transaction currentTransaction = getCurrentTransaction();‘
        // 弹出当前事务
        if (currentTransaction == transaction) {
            CURRENT.get().pop();
            if (CURRENT.get().size() == 0) {
                CURRENT.remove();
            }
        } else {
            throw new SystemException("Illegal transaction when clean after completion");
        }
    }
}

事务执行结束,从栈中弹出当前结束的事务。

providerMethodProceed(compensableMethodContext)

看完rootMethodProceed根事务切面逻辑,再来看提供者切面事务逻辑就好理解多了,方法逻辑如下:

private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

    // 获取异步回滚、异步提交标识
    Transaction transaction = null;
    boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
    boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

    try {
        // 判断当前事务状态
        switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
            // 如果事务状态为TRYING
            case TRYING:
                //  通过使用transactionContext创建分支事务
                transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
                // 执行被切方法逻辑
                return compensableMethodContext.proceed();

            // 如果事务状态为CONFIRMING
            case CONFIRMING:
                try {
                    // 对事务状态进行更新
                    transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                    // 提交事务,不执行切面方法
                    transactionManager.commit(asyncConfirm);
                } catch (NoExistedTransactionException excepton) {
                    //the transaction has been commit,ignore it.
                }
                break;

            // 如果事务状态为CANCELLING
            case CANCELLING:
                try {
                    // 更新事务状态
                    transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                    // 执行事务回滚,不执行切面方法
                    transactionManager.rollback(asyncCancel);
                } catch (NoExistedTransactionException exception) {
                    //the transaction has been rollback,ignore it.
                }
                break;
        }

    } finally {
        // 对现场进行清理
        transactionManager.cleanAfterCompletion(transaction);
    }

    Method method = compensableMethodContext.getMethod();
    // 处理原始类型返回值,返回原始类型的默认值,因为不能返回null
    return ReflectionUtils.getNullValue(method.getReturnType());
}

public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {

    // 根据事务id从事务持久化组件中查询到本事务
    Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
    // 不为空
    if (transaction != null) {
        // 对事务状态进行更新,根据传参不同,执行TRYING->CONFIRMING或者TRYING->CANCELING等操作
        transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
        // 对事务栈进行操作,执行嵌套事务入栈
        registerTransaction(transaction);
        return transaction;
    } else {
        throw new NoExistedTransactionException();
    }
}

这里进行小结,可以看到在provider类型的方法切面,对于远程的Participant,如果transaction的status为trying,则通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑;

如果是status为confirming或canceling,则会调用对应的confirm或cancel配置的方法,跳过被切方法

对于普通类型方法直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地proxy方法,所以直接执行即可

ResourceCoordinatorAspect

ResourceCoordinatorAspect切面主要是为了执行资源协调,它的实现为ConfigurableCoordinatorAspect

[ResourceCoordinatorAspect.java]
@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void transactionContextCall() {
    }

    @Around("transactionContextCall()")
    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }

    public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
        this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
    }

    public abstract int getOrder();
}

[ConfigurableCoordinatorAspect.java]
@Aspect
public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {

    private TransactionConfigurator transactionConfigurator;

    public void init() {
        ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();
        resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager());
        this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 1;
    }

    public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
        this.transactionConfigurator = transactionConfigurator;
    }
}

ConfigurableCoordinatorAspect的职责为设置事务的参与者;在一个事务内,每个被@Compensable注解的方法都是事务参与者。

可以看到该切面的优先级为 Ordered.HIGHEST_PRECEDENCE + 1,order的数值大于CompensableTransactionAspect。由于 @Order中的值越小,优先级越高,因此切面ResourceCoordinatorAspect的优先级小于CompensableTransactionAspect。

从代码可以看出,设置事务参与者逻辑是通过ResourceCoordinatorInterceptor.interceptTransactionContextMethod方法执行的。

[ResourceCoordinatorInterceptor.java]
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {

    // 从当前ThreadLocal中获取事务
    Transaction transaction = transactionManager.getCurrentTransaction();
    if (transaction != null) {
        switch (transaction.getStatus()) {
            case TRYING:
                // 只需要在TRYING阶段将参与者的信息提取出来设置到transaction中
                enlistParticipant(pjp);
                break;
            case CONFIRMING:
                break;
            case CANCELLING:
                break;
        }
    }
    // 执行目标方法
    return pjp.proceed(pjp.getArgs());
}

我们可以得知,在trying阶段,框架会把所有事务参与者加入到当前事务中去。

对于Root方法,先创建主事务,事务参与者包括Root方法对应的本地参与者及Normal方法对应的远程参与者;

对于Provider方法,首先通过主事务上下文创建分支事务,事务参与者包括Provider方法对应的本地参与者以及它所包含的Normal方法对应的远程参与者。而远程参与者又可以开启新的分支事务。

我们可以合理的猜想,如果事务嵌套的层级很多,一定会存在性能问题。

enlistParticipant(pjp)

我们详细看一下enlistParticipant(pjp)是如何生成的事务参与者对象。

private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {

    // 首先获取@Compensable信息
    Method method = CompensableMethodUtils.getCompensableMethod(pjp);
    if (method == null) {
        // @Compensable标注的方法为空则抛出异常
        throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
    }
    Compensable compensable = method.getAnnotation(Compensable.class);

    // 回去confirm和cancle方法名
    String confirmMethodName = compensable.confirmMethod();
    String cancelMethodName = compensable.cancelMethod();
    // 获取当前事务以及全局事务id
    Transaction transaction = transactionManager.getCurrentTransaction();
    TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());

    // 设置事务上下文到Editor中
    // Editor用来统一提取事务上下文,如果是dubbo则对应设置dubbo的rpc上下文
    // 此处的上下文设置之后就会调用try逻辑
    if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
        FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
    }

    // 通过目标类名,方法名,参数类型获取目标类
    Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());

    // confirm逻辑调用上下文
    InvocationContext confirmInvocation = new InvocationContext(targetClass,
            confirmMethodName,
            method.getParameterTypes(), pjp.getArgs());

    //cancel逻辑调用上下文
    InvocationContext cancelInvocation = new InvocationContext(targetClass,
            cancelMethodName,
            method.getParameterTypes(), pjp.getArgs());

    // 此处较为关键,confirm和cancle具有相同地位,都被抽象成InvocationContext
    Participant participant =
            new Participant(
                    xid,
                    confirmInvocation,
                    cancelInvocation,
                    compensable.transactionContextEditor());
    // 将participant设置到transaction中,并同步到持久化存储中
    transactionManager.enlistParticipant(participant);
}

[TransactionManager.java]
public void enlistParticipant(Participant participant) {
    Transaction transaction = this.getCurrentTransaction();
    transaction.enlistParticipant(participant);
    transactionRepository.update(transaction);
}

[Transaction.java]
public void enlistParticipant(Participant participant) {
    participants.add(participant);
}

从上述的代码逻辑中,我们可以得到结论,CompensableTransactionAspect开启事务,ResourceCoordinatorAspect对注解@Compensable进行解析,将confirm与cancel的具体逻辑设置到事务管理器中。

当上述两个切面都执行完成之后,开始执行try中的方法。如果try成功则执行commit否则执行rollback。

每个分支事务最终被封装到Transaction的participants中,每个分布式事务都有一个自己的 ThreadLocal

我们再次回顾commit的逻辑,查看Transaction.commit()方法

[Transaction.java]
public void commit() {
    // 对每一个分支执行提交操作
    for (Participant participant : participants) {
        participant.commit();
    }
}

participant就是切面ResourceCoordinatorAspect 添加的。我们再看一下participant.commit()的逻辑:

[Transaction.java]
public void commit() {
    terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}

可以看到最终事务提交是通过invoke反射实现的,我们进入invoke逻辑

public Object invoke(TransactionContext transactionContext, 
                    InvocationContext invocationContext, 
                    Class<? extends TransactionContextEditor> transactionContextEditorClass) {
    // 如果事务执行上下文方法名不为空
    if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
        try {
            Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();

            Method method = null;

            method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
            // 实例化原事务执行者的代理对象
            FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
            // 反射执行
            return method.invoke(target, invocationContext.getArgs());

        } catch (Exception e) {
            throw new SystemException(e);
        }
    }
    return null;
}

最终通过method.invoke(target, invocationContext.getArgs())方法完成了真实的事务提交操作。

到此我们对TCC-TRANSACTION的事务提交主流程进行了完整的分析。

通过分析我们可以知道TCC-TRANSACTION的核心逻辑是通过两个切面CompensableTransactionAspect、ResourceCoordinatorAspect 实现的。通过对事务进行包装与代理,实现了类二阶段的分布式事务解决方案。

实际上,TCC-TRANSACTION还有一个重要的补偿逻辑我们还没有分析,它是基于定时调度实现的。

限于本文的篇幅,就不再继续展开。我将单独用一篇文章来对TCC-TRANSACTION的补偿过程进行分析,我们下文再会。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK