zl程序教程

您现在的位置是:首页 >  后端

当前栏目

springCloud Alibaba seata 分布式事务

2023-09-11 14:19:18 时间

事务是指一个操作单元,在这个操作单元中所有操作最终要保持一致的行为。

要么所有操作都成功,要么所有操作都被撤销。

本地事务: 

 分布式事务:

 分布式事务场景

  • 单体系统访问多个数据库 

   

  •  多个微服务访问同一个数据库

  •  多个微服务访问多个数据库

 分布式事务解决方案

 1、全局事务

 

 

 2、可靠消息服务

 

 

 

 3、最大努力通知

 

 4、TCC事务

 

 

 Seata介紹

 

 

 

Seata实现分布式事务

实例:

 订单服务代码:

package com.wdz.cloud.order.controller;

import com.wdz.cloud.common.model.Order;
import com.wdz.cloud.order.service.IOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/user/order")
public class UserOrderController {

    @Autowired
    private IOrderService orderService;

    @GetMapping("create/{pid}/{userId}")
    public Order createOrder(@PathVariable("pid")Long pid, @PathVariable("userId")Long userId){
        return orderService.createOrder(pid,userId);
    }


}
package com.wdz.cloud.order.service.impl;

import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.fastjson.JSONObject;
import com.wdz.cloud.common.model.Order;
import com.wdz.cloud.common.model.Product;
import com.wdz.cloud.common.model.RocketMqTxLog;
import com.wdz.cloud.common.model.User;
import com.wdz.cloud.order.dao.OrderDao;
import com.wdz.cloud.order.dao.RocketMqTxLogDao;
import com.wdz.cloud.order.feign.FeignProductService;
import com.wdz.cloud.order.feign.FeignUserService;
import com.wdz.cloud.order.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

import java.util.Date;
import java.util.UUID;
@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {

    @Autowired
    private OrderDao orderDao;
    @Autowired
    private FeignProductService feignProductService;
    @Autowired
    private FeignUserService feignUserService;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Order createOrder(Long pid, Long userId) {
        Product product = feignProductService.findById(pid);
        log.info("-----下单商品:{}", JSONObject.toJSONString(product));
        User user = feignUserService.findById(userId);
        Order order = new Order();
        order.setProductId(pid);
        order.setProductName(product.getName());
        order.setProductNum(5);
        order.setUserId(user.getId());
        order.setUserName(user.getUserName());
        order.setProductPrice(product.getPrice());
        orderDao.save(order);
        log.info("------订单创建完成---{}",JSONObject.toJSONString(order));
        // 扣库存
        feignProductService.reduceStock(pid,5);
        return order;
    }
}

 feign代码

package com.wdz.cloud.order.feign;

import com.wdz.cloud.common.model.Product;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * fallback 用于指定远程异常返回类
 */
@FeignClient(value = "wdz-product"
//,fallbackFactory = FallbackProductException.class
)
public interface FeignProductService {


    @RequestMapping("/query/{id}")
    Product findById(@PathVariable Long id);

    @PostMapping("/reduceStock")
    void reduceStock(@RequestParam("pid") Long pid,@RequestParam("num") Integer num);
}
package com.wdz.cloud.order.feign;

import com.wdz.cloud.common.model.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;

@FeignClient(value = "wdz-user")
public interface FeignUserService {


    @RequestMapping("/query/{userId}")
    User findById(@PathVariable Long userId);

}

商品服务代码:

package com.wdz.cloud.product.controller;

import com.wdz.cloud.common.model.Product;
import com.wdz.cloud.product.service.IProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProductController {

    @Autowired
    private IProductService productService;

    @GetMapping("/query/{id}")
    public Product query(@PathVariable("id") Long id) {
        return productService.query(id);
    }
    @PostMapping("/reduceStock")
    public void reduceStock(Long pid,Integer num){
        productService.reduceStock(pid,num);
    }


}
package com.wdz.cloud.product.service.impl;

import com.wdz.cloud.common.model.Product;
import com.wdz.cloud.product.dao.ProductDao;
import com.wdz.cloud.product.service.IProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ProductServiceImpl implements IProductService {

    @Autowired
    private ProductDao productDao;

    @Override
    public Product query(Long id) {
        Product product = productDao.findById(id).get();
        return product;
    }

    @Override
    public void reduceStock(Long pid, Integer num) {
        Product product = productDao.findById(pid).get();
        Integer stock = product.getStock();
        product.setStock(stock-num);
        productDao.save(product);
    }
}

以上代码运行正常,订单及库存数据正常

 模拟异常

 修改商品减库存业务,模拟异常,订单创建成功,库存减少失败

@Override
public void reduceStock(Long pid, Integer num) {
    Product product = productDao.findById(pid).get();
    Integer stock = product.getStock();
    product.setStock(stock-num);
        
    int i = 1/0;
        
    productDao.save(product);
}

使用Seata解决异常问题

下载地址:下载中心

seata配置修改registry.conf:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 注册中心,根据服务注册中心进行配置,默认是file
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "public"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  # 配置中心,默认file,根据自己的服务进行配置
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "public"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  apollo {
    appId = "seata-server"
    ## apolloConfigService will cover apolloMeta
    apolloMeta = "http://192.168.1.204:8801"
    apolloConfigService = "http://192.168.1.204:8080"
    namespace = "application"
    apolloAccesskeySecret = ""
    cluster = "seata"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
    nodePath = "/seata/seata.properties"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

我配置的:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 注册中心,根据服务注册中心进行配置,默认是file
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "public"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  # 配置中心,默认file,根据自己的服务进行配置
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "public"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}

如果使用file方式,则继续修改file.conf 中的数据库等配置即可

查看seata 文档

# 脚本说明

## [client](https://github.com/seata/seata/tree/develop/script/client) 

> 存放用于客户端的配置和SQL

- at: AT模式下的 `undo_log` 建表语句
- conf: 客户端的配置文件
- saga: SAGA 模式下所需表的建表语句
- spring: SpringBoot 应用支持的配置文件

## [server](https://github.com/seata/seata/tree/develop/script/server)

> 存放server侧所需SQL和部署脚本

- db: server 侧的保存模式为 `db` 时所需表的建表语句
- docker-compose: server 侧通过 docker-compose 部署的脚本
- helm: server 侧通过 Helm 部署的脚本
- kubernetes: server 侧通过 Kubernetes 部署的脚本

## [config-center](https://github.com/seata/seata/tree/develop/script/config-center)

> 用于存放各种配置中心的初始化脚本,执行时都会读取 `config.txt`配置文件,并写入配置中心

- nacos: 用于向 Nacos 中添加配置
- zk: 用于向 Zookeeper 中添加配置,脚本依赖 Zookeeper 的相关脚本,需要手动下载;ZooKeeper相关的配置可以写在 `zk-params.txt` 中,也可以在执行的时候输入
- apollo: 向 Apollo 中添加配置,Apollo 的地址端口等可以写在 `apollo-params.txt`,也可以在执行的时候输入
- etcd3: 用于向 Etcd3 中添加配置
- consul: 用于向 consul 中添加配置

https://github.com/seata/seata/tree/develop/script/config-center/nacos 下载 nacos-config.sh

https://github.com/seata/seata/tree/develop/script/config-center   下载config.txt

nacos-config.sh和config.txt放在同一个文件夹下,打开命令行工具,执行命令向nacos中添加配置文件:

#  nacos-config.sh 127.0.01

如果执行命令之后闪退,是config.txt没有读取到,更改nacos-config.sh文件95行

for line in $(cat ./config.txt | sed s/[[:space:]]//g); do

启动seata,进入bin目录使用命令窗口执行:

#   .\seata-server.bat -p 8001 -m file

TC事务协调器,即seata 服务

TM事务管理器 使用GlobaTransactional注解开启事物

RM资源管理器,即数据库。运行sql创建表

CREATE TABLE `undo_log` (
 
  `id` bigint NOT NULL AUTO_INCREMENT,
 
  `branch_id` bigint NOT NULL,
 
  `xid` varchar(100) NOT NULL,
 
  `context` varchar(128) NOT NULL,
 
  `rollback_info` longblob NOT NULL,
 
  `log_status` int NOT NULL,
 
  `log_created` datetime NOT NULL,
 
  `log_modified` datetime NOT NULL,
 
  PRIMARY KEY (`id`),
 
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
 
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

引入依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

创建代理数据源:DataSourceProxyConfig

Seata是通过代理数据源实现事务分支的,需要配置io.seata.rm.datasource.DataSourceProxy的Bean

package com.wdz.cloud.order.config;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class DataSourceProxyConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource(){
        return new DruidDataSource();
    }
    @Primary
    @Bean
    public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }


}

将seata中registry.conf文件放到项目的resources下

修改bootstrap.yml

spring:
  application:
    name: wdz-product
  cloud:
      nacos:
        config:
          server-addr: localhost:8848 #nacos 的服务地址
          file-extension: yaml # 配置格式
          shared-dataids: all-server-config.yaml  # 配置要引入的配置
          refreshable-dataids: all-server-config.yaml # 配置实现动态配置刷新的配置
          group: SEATA_GROUP # 配置分组 默认值:DEFAULT_GROUP,如果需要进行分组管理的时候将同一组配置中的group设置一致即可
      alibaba:
        seata:
          # 该配置项要与config.txt中的配置项后缀一致:
          # service.vgroupMapping.wdz-product=detault
          # service.vgroupMapping.wdz-order=detault
          # service.vgroupMapping.default_tx_group=default
          tx-service-group: ${spring.application.name}
  profiles:
    active: test  # 环境标识 dev 开发环境

 

 

待续。。。。。。。