zl程序教程

您现在的位置是:首页 >  其他

当前栏目

golang源码分析:seata-go (1)at模式

2023-02-18 16:32:25 时间

上一讲mac 上学习k8s系列(53)seata-go介绍了如何在本地部署seata,本文从源码角度分析下seata-go的具体实现:

首先看下at模式下的最基础的例子:sample/at/basic/main.go,它的main函数很简单

func main() {
  client.Init()
  initService()
  tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
    Name:    "ATSampleLocalGlobalTx",
    Timeout: time.Second * 30,
  }, updateData)
  <-make(chan struct{})
}

1,客户端的初始化,里面实现了Rm和Tm的初始化,目前是空的实现,使用者可以根据自己的需求来实现初始化。

client.Init()
  initRmClient()
  initTmClient()

其中initTmClient函数实现了调用了两个函数

initConfig()
initRemoting()

后者调用了

getty.InitRpcClient()

它调用了

rpcClient.init()

它利用github.com/apache/dubbo-getty 初始化了grpc链接,维持我们的tm和seata server之间的通信。

type Client interface {
  EndPoint
}
addressList := getAvailServerList()
gettyClient := getty.NewTCPClient(
  gxsync.NewTaskPoolSimple(0)
go gettyClient.RunEventLoop(c.newSession)

链接成功后通过协程维持链接心跳的刷新。

2,mysql链接的初始化

initService()
  db, err = sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true")

其中

SeataATMySQLDriver = "seata-at-mysql"

在初始化的时候注册了at模式下的mysql driver,pkg/datasource/sql/driver.go

SeataATMySQLDriver = "seata-at-mysql"
SeataXAMySQLDriver = "seata-xa-mysql"

func init() 
     sql.Register(SeataATMySQLDriver, &seataATDriver{
    seataDriver: &seataDriver{
      transType: types.ATMode,
        sql.Register(SeataXAMySQLDriver, &seataXADriver{
    seataDriver: &seataDriver{
      transType: types.XAMode,
    type seataATDriver struct {
  *seataDriver
}

这个driver实现了sql语句的解析,方便记录undo和redolog

func (d *seataATDriver) OpenConnector(name string) (c driver.Connector, err error) 
        connector, err := d.seataDriver.OpenConnector(name)
func (d *seataDriver) Open(name string) (driver.Conn, error) {
}
func (d *seataDriver) OpenConnector(name string) (c driver.Connector, err error) {
        proxy, err := getOpenConnectorProxy(c, dbType, sql.OpenDB(c), name)

类似的还有XA driver的实现

type seataXADriver struct {
  *seataDriver
}

func (d *seataXADriver) OpenConnector(name string) (c driver.Connector, err error) 
    type seataDriver struct {
  transType types.TransactionType
  target    driver.Driver
}

3,注册全局事务id,执行我们的任务,并根据执行结果实现提交或者回滚。

tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
    Name:    "ATSampleLocalGlobalTx",
    Timeout: time.Second * 30,
  }, updateData)

在全局事务的ctx执行我们的updateData任务

其中配置的默认配置定义在pkg/config/client_config.go

  func GetClientConfig() *ClientConfig {
        GetDefaultGettyConfig(),

pkg/tm/transaction_executor.go中定义了我们最核心的函数:

func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) (re error) {
      seataContextVariable = ContextParam("seataContextVariable")
        if IsGlobalTx(ctx) {
             ctx = transferTx(ctx)
             SetXID(newCtx, GetXID(ctx))
             variable.(*ContextVariable).Xid = xid
       
       re = begin(ctx, gc)
       
    defer func() {
      var err error
       // no need to do second phase if propagation is some type e.g. NotSupported.
       if IsGlobalTx(ctx) {
         // business maybe to throw panic, so need to recover it here.
          if err = commitOrRollback(ctx, recover() == nil && re == nil); err != nil 
          }
     }
  re = business(ctx)
 }

可以看到这就是核心逻辑:

A,在context中获取全局事务id,如果没有获取到,到searver上去获取

B,执行我们的本地事务

C,根据本地事务返回的成功失败,确定是回滚还是提交。

func begin(ctx context.Context, gc *GtxConfig) error {
    return beginNewGtx(ctx, gc)
}
func beginNewGtx(ctx context.Context, gc *GtxConfig) error {
  GetGlobalTransactionManager().Begin(ctx, timeout)
}
func (g *GlobalTransactionManager) Begin(ctx context.Context, timeout time.Duration) error {
              res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
      return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}
type GtxConfig struct {
  Timeout           time.Duration
  Name              string
  Propagation       Propagation
  LockRetryInternal time.Duration
  LockRetryTimes    int16
}

提交事务或者回滚事务的流程如下,根据当前角色确认下一步该如何操作:

func commitOrRollback(ctx context.Context, isSuccess bool) (re error) {
      case Launcher:
       if tx := GetTx(ctx); isSuccess {
        GetGlobalTransactionManager().Commit(ctx, tx);
        re = GetGlobalTransactionManager().Rollback(ctx, tx);
      case Participant:
      case UnKnow:

提交

func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error {
        req := message.GlobalCommitRequest{
    AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
  }
   for bf.Ongoing() {
    if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req);
      bf.Wait()

回滚

func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTransaction) error {
        req := message.GlobalRollbackRequest{
    AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
  }
  for bf.Ongoing() {
    if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == 
      bf.Wait()

其中全局事务的定义在pkg/tm/context.go

type GlobalTransaction struct {
  Xid     string
  XidCopy string
  TxName  string
  // TxStatus Identify a global transaction in a certain status
  TxStatus message.GlobalStatus
  // TxRole Roles in the transaction propagation behavior
  TxRole GlobalTransactionRole
}

事务的相关状态定义pkg/tm/constant.go

type Propagation int8
      Required
      RequiresNew
      NotSupported
      Supports
      Never
      Mandatory

当前协程所担任的角色

const (
  UnKnow      = GlobalTransactionRole(0)
  Launcher    = GlobalTransactionRole(1)
  Participant = GlobalTransactionRole(2)
)

全局事务的状态

GlobalStatus
      GlobalStatusUnKnown
      GlobalStatusBegin
      GlobalStatusCommitting
      GlobalStatusCommitRetrying
      GlobalStatusRollbacking
      GlobalStatusRollbackRetrying
      GlobalStatusTimeoutRollbacking
      GlobalStatusTimeoutRollbackRetrying
      GlobalStatusAsyncCommitting
      GlobalStatusCommitted
      GlobalStatusCommitFailed
      GlobalStatusRollbacked
      GlobalStatusRollbackFailed
      GlobalStatusTimeoutRollbacked
      GlobalStatusTimeoutRollbackFailed
      GlobalStatusFinished

可以看到at模式其实本质上是通过一个全局锁在事务分支的分支锁实现事务的隔离的,下一讲我们通过源码分析下tcc模式。