根据官网的quickstart https://seata.apache.org/zh-cn/docs/user/quickstart/
官方讲解了对应的服务结构,并提供了实例代码
按照要求启动seata服务,这里我使用docker安装
docker run -d --name seata-server \-p 8091:8091 \seataio/seata-server
获取demo源代码,gitee找了一个镜像fork下来
使用熟悉的技术栈对应的实例代码
查看配置文件,发现服务需要db以及dubbo需要zk作为注册中心。
使用docker启动对应的db和zk
创建对应的数据库和表(每个服务中有对应的sql文件,执行一份就行,都是一样的)
修改对应配置并启动相关服务
这里主要修改了db的链接和账号密码,以及zk的连接地址,seata使用file作为registry和config-center
另,源代码的日志依赖有问题导致启动失败,进行了一些调整。
梳理代码流程并测试
整体流程如图
- bisiness 购买商品
- storage 扣除对应商品的库存
- order 创建订单
- order 计算金额
- account 扣除用户账户的金额
- order 增加订单记录
代码演示
- business服务 购买方法 提交对应用户购买一定数量的对应商品
// 开启分布式事务控制
@GlobalTransactional(timeoutMills = 300000, name = "spring-dubbo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {LOGGER.info("purchase begin ... xid: " + RootContext.getXID());// 根据商品编号减少库存storageService.deduct(commodityCode, orderCount);// just test batch update//stockService.batchDeduct(commodityCode, orderCount);// 为用户创建指定商品的订单orderService.create(userId, commodityCode, orderCount);// 此处随机失败if (random.nextBoolean()) {throw new RuntimeException("random exception mock!");}
}
- 库存服务直接删除对应库存
@Override
public void deduct(String commodityCode, int count) {LOGGER.info("Stock Service Begin ... xid: " + RootContext.getXID());LOGGER.info("Deducting inventory SQL: update stock_tbl set count = count - {} where commodity_code = {}", count,commodityCode);jdbcTemplate.update("update stock_tbl set count = count - ? where commodity_code = ?",count, commodityCode);LOGGER.info("Stock Service End ... ");}
- 订单服务中扣除对应账号金额 并创建订单
@Override
public void create(String userId, String commodityCode, int orderCount) {LOGGER.info("Order Service Begin ... xid: " + RootContext.getXID());// 计算订单金额int orderMoney = calculate(commodityCode, orderCount);// 从账户余额扣款accountService.debit(userId, orderMoney);LOGGER.info("Order Service SQL: insert into order_tbl (user_id, commodity_code, count, money) values ({}, {}, {}, {})",userId, commodityCode, orderCount, orderMoney);KeyHolder keyHolder = new GeneratedKeyHolder();jdbcTemplate.update(con -> {PreparedStatement pst = con.prepareStatement("insert into order_tbl (user_id, commodity_code, count, money) values (?, ?, ?, ?)",PreparedStatement.RETURN_GENERATED_KEYS);pst.setObject(1, userId);pst.setObject(2, commodityCode);pst.setObject(3, orderCount);pst.setObject(4, orderMoney);return pst;}, keyHolder);LOGGER.info("Order Service End ... Created " + Objects.requireNonNull(keyHolder.getKey()).longValue());
}
- 账户服务扣款
@Override
public void debit(String userId, int money) {LOGGER.info("Account Service ... xid: " + RootContext.getXID());LOGGER.info("Deducting balance SQL: update account_tbl set money = money - {} where user_id = {}", money,userId);jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[]{money, userId});LOGGER.info("Account Service End ... ");
}
测试
由于方法启动在business服务的启动方法块,删除这部分代码增加一个controller调用
@SpringBootApplication
public class SpringbootDubboSeataBusinessApplication
// implements BeanFactoryAware{// private static BeanFactory BEAN_FACTORY;public static void main(String[] args) throws Exception {SpringApplication.run(SpringbootDubboSeataBusinessApplication.class, args);// 此处注释掉,改为controller 调用
// BusinessService businessService = BEAN_FACTORY.getBean(BusinessService.class);
//
// businessService.purchase("U100001", "C00321", 2);}// @Override
// public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
// BEAN_FACTORY = beanFactory;
// }
}
增加controller,并修改代码使整个方法返回订单id
@RestController
@RequestMapping("/business")
public class BusinessController {private final BusinessService businessService;@Autowiredpublic BusinessController(BusinessService businessService) {this.businessService = businessService;}@GetMapping(value = "purchase", produces = "application/json")public BusinessResult purchase(@RequestParam("userId") String userId,@RequestParam("commodityCode")String commodityCode,@RequestParam("orderCount")int orderCount){Long orderId;try {orderId = businessService.purchase(userId, commodityCode, orderCount);} catch (Exception e) {return BusinessResult.fail("失败");}return BusinessResult.success(orderId);}
}
business服务增加web依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
business服务application.properties增加web配置
server.port=8080
server.servlet.context-path=/
db插入模拟数据,由于整套代码数据库无数据也可以执行,但为了效果更明显,还是插入对应的数据。主要是用户账户和库存数据
-- 插入账户,U00001 金额1w, 没件商品固定200
insert into account_tbl (id, user_id, money)
values (1, "U00001", 10000
)-- 插入库存,C00001 存量1000
insert into stock_tbl (id, commodity_code, `count`)
values (1, "C00001", 1000
)
此外由于流程上是先扣库存,在扣余额,因此设定库存总价 1000 * 200 远远高于 用户账户余额, 这也符合实际。另外在扣款实现上增加余额不足的逻辑判定
@Overridepublic void debit(String userId, int money) {LOGGER.info("Account Service ... xid: " + RootContext.getXID());LOGGER.info("Deducting balance SQL: update account_tbl set money = money - {} where user_id = {}", money,userId);Integer balance = getAccountBalance(userId);if (balance < money) {LOGGER.error("Account Service ... " + userId + " " + money + " balance is not enough");throw new RuntimeException("");}jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[]{money, userId});LOGGER.info("Account Service End ... ");}public Integer getAccountBalance(String userId) {String sql = "SELECT money FROM account_tbl WHERE user_id = ?";try {return jdbcTemplate.queryForObject(sql, new Object[]{userId}, Integer.class);} catch (Exception e) {e.printStackTrace();return null;}}
准备就绪,重启代码,调用controller测试,浏览器输入路径
数据库初始状态
http://127.0.0.1:8080/business/purchase?userId=U00001&commodityCode=C00001&orderCount=10
- 第一次调用:
- 观察数据库数据
- 调用成功,数据变化正确
- 第二次调用:
- 查看服务日志,发生了二阶段回滚
- 观察数据库数据
- 数据无变化,全局事务有效
- 第N次调用后,发现再也无法成功调用,观察数据库发现余额不足了
- 但是此处无法显示异常的消息,应该是seata代理了调用链,修改了异常信息
至此,初体验结束。
另附dubbo使用zk作为注册中心时的znode结构
几个关键点
- registry.conf 文件,在
io.seata.ConfigurationFactory 工厂类中加载
,默认存放到classpath 跟路径中即可。registry.conf文件主要用于配制seata服务的地址,服务注册和配置中心。
- @GlobalTransactional 注解的使用
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.TYPE})
@Inherited
public @interface GlobalTransactional {/*** Global transaction timeoutMills in MILLISECONDS.* If client.tm.default-global-transaction-timeout is configured, It will replace the DefaultValues* .DEFAULT_GLOBAL_TRANSACTION_TIMEOUT.** @return timeoutMills in MILLISECONDS.*/int timeoutMills() default DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;/*** Given name of the global transaction instance.** @return Given name.*/String name() default "";/*** roll back for the Class** @return the class array of the rollback for*/Class<? extends Throwable>[] rollbackFor() default {};/*** roll back for the class name** @return the class name of rollback for*/String[] rollbackForClassName() default {};/*** not roll back for the Class** @return the class array of no rollback for*/Class<? extends Throwable>[] noRollbackFor() default {};/*** not roll back for the class name** @return string [ ]*/String[] noRollbackForClassName() default {};/*** the propagation of the global transaction** @return propagation*/Propagation propagation() default Propagation.REQUIRED;/*** customized global lock retry interval(unit: ms)* you may use this to override global config of "client.rm.lock.retryInterval"* note: 0 or negative number will take no effect(which mean fall back to global config)** @return int*/int lockRetryInterval() default 0;/*** customized global lock retry interval(unit: ms)* you may use this to override global config of "client.rm.lock.retryInterval"* note: 0 or negative number will take no effect(which mean fall back to global config)** @return int*/@Deprecatedint lockRetryInternal() default 0;/*** customized global lock retry times* you may use this to override global config of "client.rm.lock.retryTimes"* note: negative number will take no effect(which mean fall back to global config)** @return int*/int lockRetryTimes() default -1;/*** pick the Acquire lock policy** @return lock strategy mode*/LockStrategyMode lockStrategyMode() default LockStrategyMode.PESSIMISTIC;}
- 自动注入类及其自定义点和扩展点
- SeataCoreAutoConfiguration
- SeataCoreAutoConfiguration
�
�