golang源码分析:seata-go (1)at模式
上一讲mac 上学习k8s系列(53)seata-go介绍了如何在本地部署seata,本文从源码角度分析下seata-go的具体实现:
首先看下at模式下的最基础的例子:sample/at/basic/main.go,它的main函数很简单
func main() {
client.Init()
initService()
tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx",
Timeout: time.Second * 30,
}, updateData)
<-make(chan struct{})
}
1,客户端的初始化,里面实现了Rm和Tm的初始化,目前是空的实现,使用者可以根据自己的需求来实现初始化。
client.Init()
initRmClient()
initTmClient()
其中initTmClient函数实现了调用了两个函数
initConfig()
initRemoting()
后者调用了
getty.InitRpcClient()
它调用了
rpcClient.init()
它利用github.com/apache/dubbo-getty 初始化了grpc链接,维持我们的tm和seata server之间的通信。
type Client interface {
EndPoint
}
addressList := getAvailServerList()
gettyClient := getty.NewTCPClient(
gxsync.NewTaskPoolSimple(0)
go gettyClient.RunEventLoop(c.newSession)
链接成功后通过协程维持链接心跳的刷新。
2,mysql链接的初始化
initService()
db, err = sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true")
其中
SeataATMySQLDriver = "seata-at-mysql"
在初始化的时候注册了at模式下的mysql driver,pkg/datasource/sql/driver.go
SeataATMySQLDriver = "seata-at-mysql"
SeataXAMySQLDriver = "seata-xa-mysql"
func init()
sql.Register(SeataATMySQLDriver, &seataATDriver{
seataDriver: &seataDriver{
transType: types.ATMode,
sql.Register(SeataXAMySQLDriver, &seataXADriver{
seataDriver: &seataDriver{
transType: types.XAMode,
type seataATDriver struct {
*seataDriver
}
这个driver实现了sql语句的解析,方便记录undo和redolog
func (d *seataATDriver) OpenConnector(name string) (c driver.Connector, err error)
connector, err := d.seataDriver.OpenConnector(name)
func (d *seataDriver) Open(name string) (driver.Conn, error) {
}
func (d *seataDriver) OpenConnector(name string) (c driver.Connector, err error) {
proxy, err := getOpenConnectorProxy(c, dbType, sql.OpenDB(c), name)
类似的还有XA driver的实现
type seataXADriver struct {
*seataDriver
}
func (d *seataXADriver) OpenConnector(name string) (c driver.Connector, err error)
type seataDriver struct {
transType types.TransactionType
target driver.Driver
}
3,注册全局事务id,执行我们的任务,并根据执行结果实现提交或者回滚。
tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx",
Timeout: time.Second * 30,
}, updateData)
在全局事务的ctx执行我们的updateData任务
其中配置的默认配置定义在pkg/config/client_config.go
func GetClientConfig() *ClientConfig {
GetDefaultGettyConfig(),
pkg/tm/transaction_executor.go中定义了我们最核心的函数:
func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) (re error) {
seataContextVariable = ContextParam("seataContextVariable")
if IsGlobalTx(ctx) {
ctx = transferTx(ctx)
SetXID(newCtx, GetXID(ctx))
variable.(*ContextVariable).Xid = xid
re = begin(ctx, gc)
defer func() {
var err error
// no need to do second phase if propagation is some type e.g. NotSupported.
if IsGlobalTx(ctx) {
// business maybe to throw panic, so need to recover it here.
if err = commitOrRollback(ctx, recover() == nil && re == nil); err != nil
}
}
re = business(ctx)
}
可以看到这就是核心逻辑:
A,在context中获取全局事务id,如果没有获取到,到searver上去获取
B,执行我们的本地事务
C,根据本地事务返回的成功失败,确定是回滚还是提交。
func begin(ctx context.Context, gc *GtxConfig) error {
return beginNewGtx(ctx, gc)
}
func beginNewGtx(ctx context.Context, gc *GtxConfig) error {
GetGlobalTransactionManager().Begin(ctx, timeout)
}
func (g *GlobalTransactionManager) Begin(ctx context.Context, timeout time.Duration) error {
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}
type GtxConfig struct {
Timeout time.Duration
Name string
Propagation Propagation
LockRetryInternal time.Duration
LockRetryTimes int16
}
提交事务或者回滚事务的流程如下,根据当前角色确认下一步该如何操作:
func commitOrRollback(ctx context.Context, isSuccess bool) (re error) {
case Launcher:
if tx := GetTx(ctx); isSuccess {
GetGlobalTransactionManager().Commit(ctx, tx);
re = GetGlobalTransactionManager().Rollback(ctx, tx);
case Participant:
case UnKnow:
提交
func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error {
req := message.GlobalCommitRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
}
for bf.Ongoing() {
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req);
bf.Wait()
回滚
func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTransaction) error {
req := message.GlobalRollbackRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
}
for bf.Ongoing() {
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err ==
bf.Wait()
其中全局事务的定义在pkg/tm/context.go
type GlobalTransaction struct {
Xid string
XidCopy string
TxName string
// TxStatus Identify a global transaction in a certain status
TxStatus message.GlobalStatus
// TxRole Roles in the transaction propagation behavior
TxRole GlobalTransactionRole
}
事务的相关状态定义pkg/tm/constant.go
type Propagation int8
Required
RequiresNew
NotSupported
Supports
Never
Mandatory
当前协程所担任的角色
const (
UnKnow = GlobalTransactionRole(0)
Launcher = GlobalTransactionRole(1)
Participant = GlobalTransactionRole(2)
)
全局事务的状态
GlobalStatus
GlobalStatusUnKnown
GlobalStatusBegin
GlobalStatusCommitting
GlobalStatusCommitRetrying
GlobalStatusRollbacking
GlobalStatusRollbackRetrying
GlobalStatusTimeoutRollbacking
GlobalStatusTimeoutRollbackRetrying
GlobalStatusAsyncCommitting
GlobalStatusCommitted
GlobalStatusCommitFailed
GlobalStatusRollbacked
GlobalStatusRollbackFailed
GlobalStatusTimeoutRollbacked
GlobalStatusTimeoutRollbackFailed
GlobalStatusFinished
可以看到at模式其实本质上是通过一个全局锁在事务分支的分支锁实现事务的隔离的,下一讲我们通过源码分析下tcc模式。
相关文章
- Typora+Picgo+Gitee图床
- 快速入门Golang Fuzz模糊测试
- Git版本控制入门教程(一)
- Git版本控制教程之为项目打上标签(二)
- Git版本控制教程之分支(三)
- Git版本控制教程之在Visual Studio Code中如何使用(四)
- 如何托管你的项目到github上详细教程
- Git提交记录
- 无法下载 http://dl.google.com/linux/chrome/deb/dists/stable/main/binary-amd64/Packages
- git提交报错does not match your user account
- 导出简书的文章生成gitbook上传到github通过xxx.github.io访问
- Git reset 之后 怎么恢复到 reset 之前的节点
- 准大三学生给想学习C++同学的学习路线
- 5000字用C++带你入门马氏链。
- Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本)
- MongoDB从入门到实战之MongoDB简介
- vitepress+gitee pages搭建自己的博客网站
- git clone 拉取远程仓库
- mac 系统 homebrew 管理 PHP
- PHP 冒泡排序算法