zl程序教程

您现在的位置是:首页 >  后端

当前栏目

golang源码分析:seata-go (2)tcc模式

Go源码模式Golang 分析 seata TCC
2023-06-13 09:15:58 时间

在介绍了seata-go如何部署mac 上学习k8s系列(53)seata-go和如何使用at模式golang源码分析:seata-go (1)at模式后,本文开始介绍如何使用tcc模式。

tcc模式最简单的例子是本地模式sample/tcc/local/cmd/local.go

    client.Init()
    tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
    Name: "TccSampleLocalGlobalTx",
  }, business)

它的初始化方式和开启全局事务的方式和at模式是一样的,不同的是它的business逻辑的实现,tcc模式需要每个业务自己实现Prepare,commit,cancel方法,业务侵入性更大。

func business(ctx context.Context) (re error) {
    if _, re = service.NewTestTCCServiceBusiness1Proxy().Prepare(ctx, 1);
    if _, re = service.NewTestTCCServiceBusiness2Proxy().Prepare(ctx, 3)

在实现我们的业务逻辑的时候需要通过代理模式TCCServiceProxy包裹下我们的TCCResource

pkg/rm/tcc/tcc_service.go

type TCCServiceProxy struct {
  referenceName        string
  registerResourceOnce sync.Once
  *TCCResource
}

其中pkg/rm/tcc/tcc_resource.go

type TCCResource struct {
  ResourceGroupId string `default:"DEFAULT"`
  AppName         string
  *rm.TwoPhaseAction
}

下面看下我们本地的两个service是如何包裹实现的

sample/tcc/local/service/service.go

func NewTestTCCServiceBusiness1Proxy() *tcc.TCCServiceProxy {
      tccService, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness{})
func NewTestTCCServiceBusiness2Proxy() *tcc.TCCServiceProxy {
      tccService2, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness2{})

解析传入的接口,然后注入到TCCResource里面pkg/rm/tcc/tcc_service.go

func NewTCCServiceProxy(service interface{}) (*TCCServiceProxy, error) {
    tccResource, err := ParseTCCResource(service)
    proxy := &TCCServiceProxy{
       TCCResource: tccResource,
     }
     return proxy, proxy.RegisterResource()
  }
func (t *TCCServiceProxy) RegisterResource() error {
  rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)
}

pkg/rm/tcc/tcc_resource.go

    func ParseTCCResource(v interface{}) (*TCCResource, error) {
      t, err := rm.ParseTwoPhaseAction(v)

解析过程是通过反射来判断我们的service是否实现了需要的tcc接口

func ParseTwoPhaseAction(v interface{}) (*TwoPhaseAction, error) {
  if m, ok := v.(TwoPhaseInterface); ok {
    return parseTwoPhaseActionByTwoPhaseInterface(m), nil
  }
  return ParseTwoPhaseActionByInterface(v)
}
func parseTwoPhaseActionByTwoPhaseInterface(v TwoPhaseInterface) *TwoPhaseAction {
  value := reflect.ValueOf(v)
  mp := value.MethodByName("Prepare")
  mc := value.MethodByName("Commit")
  mr := value.MethodByName("Rollback")

对于结构体,看它的属性里面是否实现了上述三个函数

func ParseTwoPhaseActionByInterface(v interface{}) (*TwoPhaseAction, error) {
  valueOfElem := reflect.ValueOf(v).Elem()
  for i := 0; i < numField; i++ {
    t := typeOf.Field(i)
    f := valueOfElem.Field(i)
    if ms, m, ok := getPrepareAction(t, f); ok {
      hasPrepareMethodName = true
      result.prepareMethod = m
      result.prepareMethodName = ms
    } else if ms, m, ok = getCommitMethod(t, f); ok {
      hasCommitMethodName = true
      result.commitMethod = m
      result.commitMethodName = ms
    } else if ms, m, ok = getRollbackMethod(t, f); ok {
      hasRollbackMethod = true
      result.rollbackMethod = m
      result.rollbackMethodName = ms
    }

解析完以后就是把我们的service注入进去

func (t *TCCServiceProxy) RegisterResource() error {
  var err error
  t.registerResourceOnce.Do(func() {
    err = rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)

然后可以看下我们实现的两个service

type TestTCCServiceBusiness struct{}
type TestTCCServiceBusiness2 struct{}

他们都实现了Prepare、Commit、Rollback三个方法,我们在main里面调用的其实是proxy的prapare方法,我们可以看看它是如何实现的

func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (interface{}, error) {
        if tm.IsGlobalTx(ctx) {
            err := t.registeBranch(ctx, params)
             tm.SetFencePhase(ctx, enum.FencePhasePrepare)
  return t.TCCResource.Prepare(ctx, params)
        variable.(*ContextVariable).FencePhase = phase

首先注册本地分支事物id,然后执行本地定义的Prepare方法。分支事务的注册过程如下:

func (t *TCCServiceProxy) registeBranch(ctx context.Context, params interface{}) error {
      tccContext := t.initBusinessActionContext(ctx, params)
      tccContext.Xid = tm.GetXID(ctx)
      actionContext := t.initActionContext(params)
      
      actionContext[constant.PrepareMethod] = t.TCCResource.TwoPhaseAction.GetPrepareMethodName()
  actionContext[constant.CommitMethod] = t.TCCResource.TwoPhaseAction.GetCommitMethodName()
  actionContext[constant.RollbackMethod] = t.TCCResource.TwoPhaseAction.GetRollbackMethodName()
      branchId, err := rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{
    BranchType:      branch.BranchTypeTCC,
    ResourceId:      t.GetActionName(),
    ClientId:        "",
    Xid:             tm.GetXID(ctx),
    ApplicationData: string(applicationData),
    LockKeys:        "",
  })
        resp, err := getty.GetGettyRemotingClient().SendSyncRequest(request)
      tm.SetBusinessActionContext(ctx, tccContext)

它定义了分支context然后通过getty发送给了seata-server 事务管理器。