本篇基于本地事件表加 ActiveMQ 实现分布式事务。
本篇是微服务应用(六):分布式事务概念及解决方案,微服务应用(七):本地事件表加消息队列实现分布式事务思路 的延续。
分布式事务通俗理解
是把原来在单点系统中至少有两个操作数据库的步骤的业务,拆成在至少两个系统,各自分别执行自己的数据库操作;由原来单点系统一个数据库的事务转为分布式事务,核心问题是在分布式环境下如何确保数据一致性。
单点系统单库的事务保证数据一致性的核心是依赖数据回滚,即只要一步操作出现异常,则整个事务内的所有操作都会回滚。
而分布式系统要实现数据同步更新并不难,难在出现异常时要实现跨系统的数据回滚,重点是在下游系统业务处理出现异常后,要想办法让上游系统知道并回滚数据(若上游系统自身业务处理抛出异常,原则上是不能调用下游业务系统,杜绝垃圾数据污染到下游系统)。
分布式环境要实现事务回滚的功能,就有必要引入一个通信机制来通知(MQ,或回调)上游系统,上游系统收到回传的消息来判断是否回滚回据或发起重试。
这里又出现了另一个问题,引入通信机制增加了系统的复杂性,若上游系统业务处理成功,但通信失败不可达或下游系统无法正常收到通知,下游就存在数据丢失导致数据不一致的情况,这里就需要引入事件表 的概念。
系统的每一步数据库操作都是一个事件并记录事件表中,事件表和业务数据表操作在同一个事务中,下游系统收到通知业务处理成功后,通知上游系统更新事件表中的状态。
如果因通信机制异常或接收通知失败,则可以通过事件表追溯自身系统的数据库操作,或通过定时任务来获取未处理的事件来重新发起通知。
分布式事务问题
一个业务涉及两个服务的操作,且是强相关性的,这里就涉及到了分布式事务,必须保证每一步操作是原子性的。但要考虑以下问题场景:
- 用户服务保存用户记录后没来得争发送消息就宕机,如果保证新增用户后一定有消息发送到队列?
- 积分服务收到消息后,还没来得及保存积分记录就宕机,如何保证系统重启后不丢失积分记录?
这两个问题的本质是如何让数据库和消息队列的操作是一个原子操作,这就需要引入事件表(t_event),见如下 SQL:
CREATE TABLE `t_event` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`type` varchar(30) DEFAULT NULL COMMENT '事件类型',
`process` varchar(30) DEFAULT NULL COMMENT '事件进行到的环节',
`content` text COMMENT '事件内容,事件发生时需要传递的数据',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
需要在 user_db 和 score_db 中都创建 t_event 表,用户服务操作用户表(t_user) 和 事件表(t_event )在同一个事务中;积分服务操作积分表(t_score)和 事件表(t_event)同样也在同一个事务中。
实现步操分析
- 用户服务接收到请求后,在 t_user 表中新增一条记录,并在 t_event 表中创建一条 process 为 NEW 的事件,content 保存要创建的积分数据,JSON字符串格式,提交事务。
- 用户系统中开启一个定时任务,定时查询 t_event 表中所有 process 为 NEW 的记录,有记录则向 MQ 发送消息,消息内容是 t_event 表的 content,消息发送成功后,更改 process 为 PUBLISHED,提交事务。
- 积分系统监听队列收到消息后,在 t_event 表中新增一条 process 为 PUBLISHED 的记录,content 为消息内容,保存成功后提交事务。
- 积分系统开启一个定时任务,定时查询 t_event 表中所有 process 为 PUBLISHED 的记录,将记录中的 content 字段内容转换为积分对象,保存积分记录成功后,修改 process 为 PROCESSED,提交事务。
分析操作步骤对原子性的支持:
- 步骤1 宕机,因业务和事件在同一个事务中,只要有一个失败,则都失败。
- 步骤2 宕机,系统重启后定时任务重新查询之前没有发布成功的事件记录继续发送消息。
- 步骤3 宕机,即接收消息时宕机,则由 MQ 的重发机制保证重新将事件发送给对应的服务。
- 步骤4 宕机,事件消息接收成功但在处理时宕机,则系统重启后定时任务会重新对之前没有处理成功的事件进行处理。
这样就保证了两个数据源之间数据状态的最终一致性。
代码实现示例
本章节模拟用户系统和积分系统的交互,新用户注册之后给该用户新增一条积分记录。用户服务创建用户之后向 MQ 的某个主题发送一条创建用户的消息,积分系统监听该主题,一旦接收到用户创建的消息,积分系统就在自己的 DB 中创建一条积分记录。
公共环境
已安装好 ActiveMQ 服务,安装步骤可参考 Spring Boot 2实践系列(三十四):集成 AcitveMQ 消息中间件 。
基于 Spring Boot 创建应用服务,用户服务和积分服务都引入依赖如下。实际项目应建一个父工程,pom.xml 文件设置打 pom 包,公共依赖在父工程 pom.xml 文件中添加。
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--activemq-pool--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
ActiveMQConfig
此配置开启 ActiveMQ 发布订阅(主题)模式,在此示例中并没有用到主题模式。
@Configuration public class ActiveMQConfig { @Bean public JmsListenerContainerFactory<?> topicListenerContainer(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory(); topicListenerContainer.setPubSubDomain(true); topicListenerContainer.setConnectionFactory(activeMQConnectionFactory); return topicListenerContainer; } }
开启定时任务功能
也可以将 @EnableScheduling 注解作用在入口类上。
@Configuration @EnableScheduling public class ScheduleConfig { }
用户服务
用户DB
用户表:
CREATE TABLE `t_user` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(50) DEFAULT NULL COMMENT '姓名',
`create_time` datetime DEFAULT NULL COMMENT '创建日期',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
事件表:
CREATE TABLE `t_event` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`type` varchar(30) DEFAULT NULL COMMENT '事件类型',
`process` varchar(30) DEFAULT NULL COMMENT '事件进行到的环节',
`content` text COMMENT '事件内容,事件发生时需要传递的数据',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
用户系统
application.properties 配置
server.port=8002 #=============jdbc dataSource========================= spring.datasource.url=jdbc:mysql://127.0.0.1:3306/tx_user_db?characterEncoding=utf-8&allowMultiQueries=true&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8 spring.datasource.username=panda spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.maximum-pool-size=50 spring.datasource.hikari.minimum-idle=20 spring.datasource.hikari.idle-timeout=60000 spring.datasource.hikari.connection-timeout=600000 spring.datasource.hikari.max-lifetime=1800000 spring.datasource.hikari.validation-timeout=10000 spring.datasource.hikari.connection-test-query=select 1 spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=123456 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=50
业务实体类
与用户表映射的用户实体类
@Data @Accessors(chain = true) @Table("t_user") public class User { @Id private Long id; private String userName; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date createTime; public User() { } public User(String userName) { this.userName = userName; } public User(String userName, Date createTime) { this.userName = userName; this.createTime = createTime; } }
积分实体类
@Data @Accessors(chain = true) @Table("t_score") public class Score { @Id private Long id; private Long userId; private Integer score; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date createTime; public Score() { } public Score(Long userId, Integer score, Date createTime) { this.userId = userId; this.score = score; this.createTime = createTime; } }
用户服务业务处理
用户业务接口
public interface UserService { User addUser(User user) throws Exception; }
业务处理实现
@Service public class UserServiceImpl implements UserService { @Autowired private UserRepository userRepository; @Autowired private EventManager eventManager; @Override @Transactional(rollbackFor = Exception.class) public User addUser(User user) { user = userRepository.save(user); eventManager.eventHandle(user.getId()); return user; } }
数据持久层操作
Spring Data JDBC 提供了
JdbcTemplate
操作数据库,也支持简单的 Repository 操作。@Repository public interface UserRepository extends CrudRepository<User, Long> { }
事件处理
在一个事件管理类里统一处理事件
@Service public class EventManager { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private EventRepository eventRepository; @Autowired private JmsTemplate jmsTemplate; /** * 发送事件到消息队列 * @param queueName * @param event */ public void sendEventQueue(String queueName, Event event) { jmsTemplate.convertAndSend(queueName, JSON.toJSONString(event)); this.updateEvent(event); } /** * 发送事件列表到消息队列 * @param queueName * @param eventList */ public void sendEventQueue(String queueName, List<Event> eventList) { if (!CollectionUtils.isEmpty(eventList)) { for (Event event : eventList) { this.sendEventQueue(queueName, event); } } } /** * 更新事件进度 * @param event */ public void updateEvent(Event event){ String sql = "update t_event set process = ?, update_time = ? where id = ?"; jdbcTemplate.update(sql, EventProcess.PUBLISHED.getValue(), new Date(), event.getId()); } /** * 查询状态是 NEW 的事件 * @return */ public List<Event> getNewEventList() { String sql = "SELECT * FROM t_event e WHERE e.process = ?"; RowMapper<Event> rowMapper = (rs, rowNum) -> { Event event = new Event(); event.setContent(rs.getString("content")); event.setId(rs.getLong("id")); event.setProcess(rs.getString("process")); event.setType(rs.getString("type")); event.setCreateTime(rs.getTimestamp("create_time")); event.setUpdateTime(rs.getTimestamp("update_time")); return event; }; List<Event> eventList = jdbcTemplate.query(sql, new Object[]{EventProcess.NEW.getValue()}, rowMapper); return eventList; } /** * 保存事件,并发送到消息队列 * @param event * @return */ public Event saveEvent(Event event) { event = eventRepository.save(event); this.sendEventQueue("user-success-queue", event); return event; } /** * 积分事件处理 * @param id */ public void eventHandle(Long id) { Score score = new Score(id, 10, new Date()); Event event = new Event(EventType.CREATE.toString(), EventProcess.NEW.getValue(), JSON.toJSONString(score)); event.setCreateTime(new Date()); this.saveEvent(event); } }
与事件表映射的事件实体类
@Data @Accessors(chain = true) @Table("t_event") public class Event { /** * 主键 */ @Id private Long id; /** * 事件类型 */ private String type; /** * 事件过程 */ private String process; /** * 事件内容 */ private String content; /** * 创建时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date createTime; /** * 更新时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date updateTime; public Event() { } public Event(String type, String process, String content) { this.type = type; this.process = process; this.content = content; } }
事件处理阶段枚举类
public enum EventProcess { NEW("NEW","新建"), PUBLISHED("PUBLISHED","已发布"), PROCESSED("PROCESSED","已处理"); private String value; private String desc; EventProcess(String value, String desc) { this.value = value; this.desc = desc; } //---------get/set方法------------- }
事件业务类型枚举类
public enum EventType { CREATE, UPDATE, DELETE }
事件持久操作
public interface EventRepository extends CrudRepository<Event, Long> { }
事件定时任务
@Component @EnableScheduling public class UserScheduled { private static final Logger logger = LogManager.getLogger(UserScheduled.class); @Autowired private UserService userService; @Autowired private EventManager eventManager; @Scheduled(cron = "*/5 * * * * *") public void executeEvent() { List<Event> eventList = eventManager.getNewEventList(); if (!CollectionUtils.isEmpty(eventList)) { System.out.println("新建用户的事件记录总数:" + eventList.size()); eventManager.sendEventQueue("user-success-queue", eventList); } else { System.out.println("待处理的事件总数为:0"); } } }
积分服务
积分DB
积分表:
CREATE TABLE `t_score` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户ID',
`score` int(11) DEFAULT NULL COMMENT '总积分',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
事件表:
CREATE TABLE `t_event` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`source_id` bigint(20) DEFAULT NULL COMMENT '事件源ID',
`type` varchar(30) DEFAULT NULL COMMENT '事件类型',
`process` varchar(30) DEFAULT NULL COMMENT '事件进行到的环节',
`content` text COMMENT '事件内容,事件发生时需要传递的数据',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
积分系统
application.properties 配置
配置同用户服务的配置,只需改下数据库和ActiveMQ 服务的连接地址。
积分实体类
积分实体类与积分表映射,同用户服务中的积分实体类。
积分业务类
监听消息队列,收到消息写入积分库的事件表,保存积分记录
@Service public class ScoreServiceImpl implements ScoreService { private static final Logger logger = LogManager.getLogger(ScoreServiceImpl.class); @Autowired private ScoreRepository scoreRepository; @Autowired private EventManager eventManager; @Transactional(rollbackFor = Exception.class) @JmsListener(destination = "user-success-queue") public void handleUserSuccess(String message) { System.out.println("队列监听到的文本:" + message); Event event = null; try { event = JSON.parseObject(message, Event.class); String content = event.getContent(); Score score = JSON.parseObject(content, Score.class); //事件 event.setProcess(EventProcess.PUBLISHED.getValue()); event.setSourceId(event.getId()).setId(null); eventManager.saveEvent(event); //保存积分 scoreRepository.save(score); //更新事件阶段 eventManager.updateEventProcessToProcessed(event); // throw new RuntimeException(); } catch (Exception e) { /*这里处理有些问题 * 抛出异常,默认消息会重发6次再送到死信队列 * 下面执行发送消息到失败队列并不起作用 * 估计要自定义重发机制和死信队列 * 待完善.....*/ eventManager.sendEventQueue("score-failure-queue", event); throw e; } } @Override public void saveScore(Score score) { scoreRepository.save(score); } }
积分服务接口
public interface ScoreService { void saveScore(Score score); }
持久层接口
@Repository public interface EventRepository extends CrudRepository<Event, Long> { } @Repository public interface ScoreRepository extends CrudRepository<Score, Long> { }
事件处理
事件实体类:
事件实体类与事件表映射,同用户服务中的事件表,但多了个 sourId 属性。
事件枚举类
事件处理阶段枚举类 和 事件业务类型枚举类 与 用户服务中的枚举类相同。
事件处理类
@Service public class EventManager { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private EventRepository eventRepository; @Autowired private JmsTemplate jmsTemplate; /** * 发送事件到消息队列 * * @param queueName * @param event */ public void sendEventQueue(String queueName, Event event) { jmsTemplate.convertAndSend(queueName, JSON.toJSONString(event.getContent())); this.updateEventProcessToProcessed(event); } /** * 发送事件列表到消息队列 * * @param queueName * @param eventList */ public void sendEventQueue(String queueName, List<Event> eventList) { if (!CollectionUtils.isEmpty(eventList)) { for (Event event : eventList) { this.sendEventQueue(queueName, event); } } } /** * 更新事件进度 * * @param event */ public void updateEventProcessToProcessed(Event event) { String sql = "update t_event set process = ?, update_time = ? where id = ?"; jdbcTemplate.update(sql, EventProcess.PROCESSED.getValue(), new Date(), event.getId()); } public void updateEventProcessToPublished(Event event) { String sql = "update t_event set process = ?, update_time = ? where id = ?"; jdbcTemplate.update(sql, EventProcess.PUBLISHED.getValue(), new Date(), event.getId()); } /** * 查询状态是 NEW 的事件 * * @return */ public List<Event> getNewEventList() { String sql = "SELECT * FROM t_event e WHERE e.process = ?"; RowMapper<Event> rowMapper = (rs, rowNum) -> { Event event = new Event(); event.setContent(rs.getString("content")); event.setId(rs.getLong("id")); event.setProcess(rs.getString("process")); event.setType(rs.getString("type")); event.setCreateTime(rs.getTimestamp("create_time")); event.setUpdateTime(rs.getTimestamp("update_time")); return event; }; List<Event> eventList = jdbcTemplate.query(sql, new Object[]{EventProcess.NEW.getValue()}, rowMapper); return eventList; } public List<Event> getPublishedEventList() { String sql = "SELECT * FROM t_event e WHERE e.process = ?"; RowMapper<Event> rowMapper = (rs, rowNum) -> { Event event = new Event(); event.setContent(rs.getString("content")); event.setId(rs.getLong("id")); event.setProcess(rs.getString("process")); event.setType(rs.getString("type")); event.setCreateTime(rs.getTimestamp("create_time")); event.setUpdateTime(rs.getTimestamp("update_time")); return event; }; List<Event> eventList = jdbcTemplate.query(sql, new Object[]{EventProcess.PUBLISHED.getValue()}, rowMapper); return eventList; } /** * 保存事件,并发送到消息队列 * * @param event * @return */ public Event saveEventAndSendMsg(Event event) { try { this.saveEvent(event); this.sendEventQueue("user-success-queue", event); } catch (Exception e) { e.printStackTrace(); } return event; } public Event saveEvent(Event event) { event = eventRepository.save(event); return event; } /** * 积分事件处理 * * @param id */ public void eventHandle(Long id) { Score score = new Score(id, 10, new Date()); Event event = new Event(EventType.CREATE.toString(), EventProcess.NEW.getValue(), JSON.toJSONString(score)); event.setCreateTime(new Date()); this.saveEvent(event); } }
定时任务
public class ScoreScheduled { private static final Logger logger = LogManager.getLogger(ScoreScheduled.class); @Autowired private ScoreService scoreService; @Autowired private EventManager eventManager; @Scheduled(cron = "*/5 * * * * *") public void executeEvent() { List<Event> eventList = eventManager.getPublishedEventList(); if (!CollectionUtils.isEmpty(eventList)) { System.out.println("待处理的事件记录总数:" + eventList.size()); for (Event event : eventList) { Score score = JSON.parseObject(event.getContent(), Score.class); scoreService.saveScore(score); eventManager.updateEventProcessToProcessed(event); } } else { System.out.println("待处理的事件总数为:0"); } } }
事务回滚
用户服务是上游系统,积分服务是下游系统。当积分服务在处理业务时出现异常,要发送失败的消息通知道用户服务,用户服务收到失败消息可对自己的业务数据进行回滚,或发起重试。
此示例使用的是 ActiveMQ 作为消息通信,使用的是 JMS 的提供的监听,在捕抓到异常的 catch 里面无法执行发送失败的消息到失败队列,而是 ActivelyMQ 会执行默认的重试机制,重试 6 次,没有被正确消息则消息转发到默认的死信队列(ActiveMQ.DLQ
)。
以下是猜测的处理方案,待验证。。。。。
- 使用 ActiveMQ 原生的监听消息队列,不使用 JMS ,在异常时手动处理。。。。。
- 自定义死信队列,上游系统监听此死信队列,收到失败的消息再根据需要进行处理。
如果使用 RabbitMQ 作为消息中间件,则不存在上面的问题,在 catch 中抛出 AmqpRejectAndDontRequeueException
异常,使消息不进入死信队列,可在 catch 中发送消息 。
注意:本文归作者所有,未经作者允许,不得转载