8

使用Spring TransactionSynchronization执行事务后提交的调度方法 - Singh

 2 years ago
source link: https://www.jdon.com/57371
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
使用Spring TransactionSynchronization执行事务后提交的调度方法

这篇博客试图解释我们如何利用Spring的TransactionSynchronization来实现在事务提交后执行业务代码,以及如何使用 Spring AOP巧妙优雅实现的。

在spring中使用事务时,往往需要一些特定的方法在事务提交成功后才能运行,我们通过一个例子来理解这一点:-

假设一个系统中有两个微服务,第一个管理用户生命周期,我们称之为用户管理服务(UMS),另一个为用户准备订阅源(Feed Service)。

当新用户签约在UMS做一些验证,然后将一条消息推向create_feed的Apache Kafka的主题Topic。另一方面,Feed服务侦听有关create_feed主题的消息,然后调用 UMS 以获取用户的兴趣以准备Feed。

你看到这个流程有很大的问题吗!?如果注册过程中的任何后续方法失败 - 用户详细信息从未保存在系统中并且 Feed 服务的get user interest调用将会失败,该怎么办?当谈论多个微服务交互时,这个问题会被放大。

可能还有许多其他要求需要某些特定方法仅执行事务后提交,让我们看看如何优雅地解决这个问题!

解决方案

可以在TransactionSynchronizationAdapter的事务提交afterCommit方法后执行一段代码:

TransactionSynchronizationManager.registerSynchronization(
    new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            // code for publishing message to kafka
        }
});

虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。

让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。

#1 构建注释

@Target(ElementType.METHOD) 
@Retention(RetentionPolicy.RUNTIME) 
public @interface PostCommit { 
}

这部分很简单,类似于在 Java 中创建任何其他注释

#2 构建 PostCommitAdapter

PostCommitAdapter 将提供两个功能

  1. 它将注册runnables到一个ThreadLocal
  2. 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables
@Slf4j 
@Component 
public class PostCommitAdapter extends TransactionSynchronizationAdapter { 
    private static final ThreadLocal<List<Runnable>> RUNNABLE = new ThreadLocal<>(); 

    // 为提交后执行注册一个新的 runnable
     public void execute(Runnable runnable) { 
        if (TransactionSynchronizationManager.isSynchronizationActive()) { 
            List<Runnable> runnables = RUNNABLE.get(); 
            if (runnables == null) { 
                runnables = new ArrayList<>(); 
                RUNNABLE.set(runnables); 
                TransactionSynchronizationManager.registerSynchronization(this); 
            }
            return; 
        }
        // 如果事务同步未激活
        runnable.run(); 
    } 

    @Override 
    public void afterCommit() { 
        List<Runnable> runnables = RUNNABLE.get(); 
        runnables.forEach(Runnable::run); 
    } 

    @Override 
    public void afterCompletion(int status) { 
        RUNNABLE.remove(); 
    } 
}

如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables

#3 使用 around 建议连接适配器和注释

为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:

@Aspect
@Slf4j
@AllArgsConstructor
@Configuration
public class PostCommitAnnotationAspect {
    private final PostCommitAdapter postCommitAdapter;

    @Pointcut("@annotation(com...<package>..PostCommit)")
    private void postCommitPointCut(){}


    @Around("postCommitPointCut()")
    public Object aroundAdvice(final ProceedingJoinPoint pjp) {
        postCommitAdapter.execute(new PjpAfterCommitRunnable(pjp));
        return null;
    }

    private static final class PjpAfterCommitRunnable implements Runnable {
        private final ProceedingJoinPoint pjp;

        public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
            this.pjp = pjp;
        }

        @Override
        public void run() {
            try {
                pjp.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }
    }
}

用法

一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。

示例:考虑具有 PostCommit 注释方法的两个类 A 和 B

Class A {
   @PostCommit
   void log(){
      log.info("log from class A")
   }
}
Class B {
   @PostCommit
   void log(){
      log.info("log from class B")
   }
}

一个驱动类调用这些方法:

Class mainClass {
      @Transactional
      void transactionalMethod(Entity entity){
          someOperation(entity)
          log.info("inside transaction");
          a.log();
          b.log();
          save(entity);
          log.info("end of method");
      }
}

执行后输出:

> inside transaction
> ** saving entity
> log from class A
> log from Class B

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK