原理底层计划--分布式事务
迪丽瓦拉
2024-05-29 09:57:20
0

分布式事务

mysql事务

我们通过show engines查询存储引擎,mysql一般为innodb,
为什么?
因为innodb支持事务是原因之一。

特性无非ACID
原子性,一致性,隔离性,持久性
一致性是最后追求的结果,也就保证了数据的安全性。
innodb自动有事务,我们不需要再搭建事务。
只是我们要关注事务并发出现的问题

🌴 脏读:对于两个事务T1,T2,T1读取了已经被T2更新但还没有被提交的字段之后,若T2回滚,T1读取的内容就是临时且无效的
🌴不可重复读 :对于两个事务T1,T2,T1读取了一个字段,然后T2更新了该字段之后,T1在读取同一个字段,值就不同了
🌴 幻读:对于两个事务T1,T2,T1在A表中读取了一个字段,然后T2又在A表中插入了一些新的数据时,T1再读取该表时,就会发现神不知鬼不觉的多出几行了…

隔离级别:可提交读 不可提交读 可串行化

  1. read uncommitted(读未提交数据):允许事务读取未被其他事务提交的变更。(脏读、不可重复读和幻读的问题都会出现)。
  2. read committed(读已提交数据):只允许事务读取已经被其他事务提交的变更。(可以避免脏读,但不可重复读和幻读的问题仍然可能出现)
    3.repeatable read(可重复读):确保事务可以多次从一个字段中读取相同的值,在这个事务持续期间,禁止其他事务对这个字段进行更新(update)。(可以避免脏读和不可重复读,但幻读仍然存在)
  3. serializable(串行化):确保事务可以从一个表中读取相同的行,在这个事务持续期间,禁止其他事务对该表执行插入、更新和删除操作,所有并发问题都可避免,但性能十分低下(因为你不完成就都不可以弄,效率太低)

select @@tx_isolation 语句查询当前的隔离级别,使用最多:读已提交、可重复读

Spring事务一个注解解决

@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)

Propagation.REQUIRED

支持当前事务,如果当前没有事务,就新建一个事务。(默认)
Propagation.SUPPORTS

支持当前事务,如果当前没有事务,就以非事务方式执行。
Propagation.MANDATORY

支持当前事务,如果当前没有事务,就抛出异常。
Propagation.REQUIRES_NEW

新建事务,如果当前存在事务,把当前事务挂起。
Propagation.NOT_SUPPORTED

以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
Propagation.NEVER

以非事务方式执行,如果当前存在事务,则抛出异常。
Propagation.NESTED

如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。

分布式事务

Spring Boot 微服务里面有个注解是 @EnableTransactionManagement ,其实这个就是本地事务。这个不是分布式事务。

redis可以支持分布式事务实现,但是不推荐使用;
redis也可以做消息中间件,但不推荐使用。
redis更擅长的事缓存,非关系型数据库层面。

rocketMq支持分布式事务,Kafka不支持事务
RocketMQ事务流程关键

事务消息在一阶段对用户不可见
事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成 RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。
如何处理第二阶段的失败消息?
在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。
当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。
消息状态 事务消息有三种状态:TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

所以分布式事务解决方案参考思路如下,采用rocketmq实现,方案和假代码如下:

本地消息表(所有mq都可以)
本地消息表其实就是利用了 各系统本地的事务来实现分布式事务。

当系统 A 被其他系统B调用发生数据库表更操作,首先会更新数据库的业务表,其次会往相同数据库的消息表和记录表中插入一条数据,两个操作发生在同一个事务中
如果消息表没有插入进去,说明第一步更新业务表失败了,那么下一步就不用进行了

@Test
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void transactionStart(User user, MessageTable messageTable) {//假设user是要修改的数据userService.updateById(user);messageTableSevice.insert(messageTable);logService.insert(messageTable);//下面不一定要轮询,可以在这里同时发送mq消息}

系统 A 的脚本定期轮询或者定时器本地消息往 mq 中写入一条消息,如果消息发送失败会进行重试

@Test
//每分钟
@Scheduled(cron = "0/60 * * * * ?}")
public void loopSend() {List messageTables = messageTableSevice.selectAllByNoDel();if (CollectionUtil.isEmpty(messageTables)) {return;}messageTables.forEach(item -> {//value存放请求参数(对象转字符串),然后在b消费的时候,用到//body字节数组就行byte[] body = new byte[0];try {body = this.getByteArrayByObject(item.getValue());} catch (IOException e) {throw new RuntimeException(e);}String key = "TRANSACTION_KEY";//可靠性高采用同步SendResult sendResult = this.sendMsg(mqTopicConfig.getSaasReportTopic(), mqTopicConfig.getSaasTopicTagRunReport(), body, key);log.info("log: " + sendResult.toString());});}
/*** 对象转字符数组 假代码* @param messageTable* @return* @throws IOException*/
public byte[] getByteArrayByObject(Object messageTable) throws IOException {ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);objectOutputStream.writeObject(messageTable);byte[] bytes = byteArrayOutputStream.toByteArray();byteArrayOutputStream.close();objectOutputStream.close();return bytes;
}
public Object getObjectByByteArray(byte[] bytes) throws IOException, ClassNotFoundException {final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);final Object object = objectInputStream.readObject();byteArrayInputStream.close();objectInputStream.close();return object;
}

工具类发送消息改造一下,byte[] body不去转成String,直接传字节流就行了;如果要转String,这样:

messageBody.getBytes(StandardCharsets.UTF_8)public SendResult sendMsg(String topic, String msgTag, byte[] messageBody, String msgKey) {//byte[] bytes = messageBody.getBytes(StandardCharsets.UTF_8);org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(topic, msgTag, msgKey, messageBody);return this.send(msg, Boolean.FALSE);}
   private SendResult send(org.apache.rocketmq.common.message.Message msg, Boolean isOneWay) {try {if (isOneWay) {this.producer.sendOneway(msg);log.info("....");return null;} else {SendResult sendResult = this.producer.send(msg);if (sendResult != null) {log.info("....");return sendResult;} else {log.error("...");return null;}}} catch (Exception var4) {log.error("...");return null;}}

系统 B 消费 mq 中的消息,并处理业务逻辑。如果本地事务处理失败,会在继续消费 mq 中的消息进行重试,如果业务上的失败,可以通知系统 A 进行回滚操作,也就是根据记录的数据进行补偿;

public Action consume(Message message, ConsumeContext consumeContext) {try {log.info("start get consume");byte[] body1 = message.getBody();Object objectByByteArray = this.getObjectByByteArray(body1);MessageTable messageTable = (MessageTable) objectByByteArray;//String body = new String(message.getBody(), StandardCharsets.UTF_8);if (null == messageTable) {//如果body不存在,CommitMessagelog.info("body is null");return Action.CommitMessage;}log.info("messageTable:{}", messageTable);//本地业务 body字符串转回来对象Integer a = bService.update(messageTable);//失败的话要回滚if (a <= 0) {//根据日志信息回滚回去 就是补偿机制userService.updateById(user);}return Action.CommitMessage;} catch (Exception e) {log.error("consume fail", e);return Action.CommitMessage;}
}

相关内容