1

【分布式技术专题】「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分...

 1 year ago
source link: https://blog.51cto.com/alex4dream/5815022
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【分布式技术专题】「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(下)

精选 原创

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!

分布式事务方案设计

实际运用理论时进行架构设计时,许多人容易犯“手里有了锤子,看什么都觉得像钉子”的错误,设计方案时考虑的问题场景过多,各种重试,各种补偿机制引入系统,导致系统过于复杂,落地遥遥无期。

  1. 有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。
  2. 设计分布式事务系统也不是需要考虑所有异常情况,不必过度设计各种回滚,补偿机制。
  3. 如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。

如果系统要实现回滚流程的话,有可能系统复杂度将大大提升,且很容易出现 Bug,估计出现 Bug 的概率会比需要事务回滚的概率大很多。

RocketMQ的事务消息实现

【分布式技术专题】「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(下)_spring

搭建RocketMQ

解压压缩包
unzip rocketmq-all-4.7.1-bin-release.zip
切换⽬录到RocketMQ根⽬录
cd rocketmq-all-4.7.1-bin-release
启动Name Server
nohup sh bin/mqnamesrv &
验证是否启动OK:
tail -f ~/logs/rocketmqlogs/namesrv.log
如果成功启动,能看到类似如下的⽇志:
2019-07-18 17:03:56 INFO main - The Name Server boot success. ...
启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
验证是否启动OK:
tail -f ~/logs/rocketmqlogs/broker.log
如果启动成功,能看到类似如下的⽇志:
2019-07-18 17:08:41 INFO main - The broker[itmuchcomdeMacBook-Pro.local, 192.16
8.43.197:10911] boot success. serializeType=JSON and name server is localhost:9876
消息⽣产者代码
MAVEN依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test-group
发送者Java代码
Controller代码
@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {
private final ShareService shareService;
@PutMapping("/audit/{id}")
public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
return this.shareService.auditById(id, auditDTO);
}
}
Service代码
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数⾮法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数⾮法!该分享已审核通过或审核不通过!");
}
// 3. 如果是PASS,那么发送消息给rocketmq,让⽤户中⼼去消费,并为发布⼈添加
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction("add-bonus",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)// header也有妙⽤...
.setHeader("share_id", id).build(),auditDTO// arg有⼤⽤处 );
} else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share写到缓存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder().transactionId(transactionId).log("审核分享...").build());
}
Listener代码
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
try {
this.shareService.auditByIdWithRocketMqLog(shareId, (ShareAuditDTO)arg, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder()
.transactionId(transactionId).build());
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
消息消费者代码
Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
rocketmq:
name-server: 127.0.0.1:9876
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
// 1. 为⽤户加
Integer userId = message.getUserId();
Integer bonus = message.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 记录⽇志到bonus_event_log表⾥⾯
this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
.createTime(new Date()).description("投稿加积分..").build());
log.info("积分添加完毕...");
}
}
  • 打赏
  • 1
  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK