zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Seata-Saga模式 原理

模式原理 seata saga
2023-06-13 09:14:39 时间

大家好,又见面了,我是你们的朋友全栈君。

1 Saga模式示例

1.1 Saga状态机工具

状态机设计组件:seata-saga-statemachine-designer 状态机在线画图工具:saga_designer

1.2 代码示例

github上Seata-sample有完整的示例代码,Seata Saga模式中有此示例的完整介绍和分析。这里仅摘取部分和介绍原理有关的代码进行分析。

1.2.1 初始化db

mysql示例:

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
`id`               VARCHAR(32)  NOT NULL COMMENT 'id',
`name`             VARCHAR(128) NOT NULL COMMENT 'name',
`tenant_id`        VARCHAR(32)  NOT NULL COMMENT 'tenant id',
`app_name`         VARCHAR(32)  NOT NULL COMMENT 'application name',
`type`             VARCHAR(20)  COMMENT 'state language type',
`comment_`         VARCHAR(255) COMMENT 'comment',
`ver`              VARCHAR(16)  NOT NULL COMMENT 'version',
`gmt_create`       DATETIME(3)  NOT NULL COMMENT 'create time',
`status`           VARCHAR(2)   NOT NULL COMMENT 'status(AC:active|IN:inactive)',
`content`          TEXT COMMENT 'content',
`recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
`id`                  VARCHAR(128)            NOT NULL COMMENT 'id',
`machine_id`          VARCHAR(32)             NOT NULL COMMENT 'state machine definition id',
`tenant_id`           VARCHAR(32)             NOT NULL COMMENT 'tenant id',
`parent_id`           VARCHAR(128) COMMENT 'parent id',
`gmt_started`         DATETIME(3)             NOT NULL COMMENT 'start time',
`business_key`        VARCHAR(48) COMMENT 'business key',
`start_params`        TEXT COMMENT 'start parameters',
`gmt_end`             DATETIME(3) COMMENT 'end time',
`excep`               BLOB COMMENT 'exception',
`end_params`          TEXT COMMENT 'end parameters',
`status`              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`is_running`          TINYINT(1) COMMENT 'is running(0 no|1 yes)',
`gmt_updated`         DATETIME(3) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
`id`                       VARCHAR(48)  NOT NULL COMMENT 'id',
`machine_inst_id`          VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
`name`                     VARCHAR(128) NOT NULL COMMENT 'state name',
`type`                     VARCHAR(20)  COMMENT 'state type',
`service_name`             VARCHAR(128) COMMENT 'service name',
`service_method`           VARCHAR(128) COMMENT 'method name',
`service_type`             VARCHAR(16) COMMENT 'service type',
`business_key`             VARCHAR(48) COMMENT 'business key',
`state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
`state_id_retried_for`     VARCHAR(50) COMMENT 'state retried for',
`gmt_started`              DATETIME(3)  NOT NULL COMMENT 'start time',
`is_for_update`            TINYINT(1) COMMENT 'is service for update',
`input_params`             TEXT COMMENT 'input parameters',
`output_params`            TEXT COMMENT 'output parameters',
`status`                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`excep`                    BLOB COMMENT 'exception',
`gmt_updated`              DATETIME(3) COMMENT 'update time',
`gmt_end`                  DATETIME(3) COMMENT 'end time',
PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

1.2.2 bean配置

<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool" destroy-method="dispose">
<constructor-arg>
<bean class="org.h2.jdbcx.JdbcDataSource">
<property name="URL" value="jdbc:h2:mem:seata_saga" />
<property name="user" value="sa" />
<property name="password" value="sa" />
</bean>
</constructor-arg>
</bean>
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="classpath:sql/h2_init.sql" />
</jdbc:initialize-database>
<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
<property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
</bean>
<bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
<property name="dataSource" ref="dataSource"></property>
<property name="resources" value="statelang/*.json"></property>
<property name="enableAsync" value="true"></property>
<property name="threadPoolExecutor" ref="threadExecutor"></property>
<property name="applicationId" value="saga_sample"></property>
<property name="txServiceGroup" value="my_test_tx_group"></property>
</bean>
<bean id="threadExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
<property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="20" />
</bean>
<bean class="io.seata.saga.rm.StateMachineEngineHolder">
<property name="stateMachineEngine" ref="stateMachineEngine"/>
</bean>

1.2.3 Saga状态机配置示例

{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[reduceInventoryResult] == true",
"Next":"ReduceBalance"
}
],
"Default":"Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException" : "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type":"Succeed"
},
"Fail": {
"Type":"Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}

1.2.4 启动状态机

创建StateMachineEngine
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
执行-同步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
执行-异步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);

2 状态机设置

2.1 状态机属性

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个”状态”
  • States: 状态列表,是一个map结构,key是”状态”的名称,在状态机内必须唯一

2.2 状态属性

  • Type: “状态” 的类型,比如有:
    • ServiceTask: 执行调用服务任务
    • Choice: 单条件选择路由
    • CompensationTrigger: 触发补偿流程
    • Succeed: 状态机正常结束
    • Fail: 状态机异常结束
    • SubStateMachine: 调用子状态机
    • CompensateSubMachine: 用于补偿一个子状态机
  • ServiceName: 服务名称,通常是服务的beanId
  • ServiceMethod: 服务方法名称
  • CompensateState: 该”状态”的补偿”状态”
  • Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可
  • Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  • Catch: 捕获到异常后的路由
  • Next: 服务执行完成后下一个执行的”状态”
  • Choices: Choice类型的”状态”里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个”状态”
  • ErrorCode: Fail类型”状态”的错误码
  • Message: Fail类型”状态”的错误信息

3 原理分析

Saga模式是一种长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者。

3.1 Saga状态机引擎架构

注:此图来自seata官网的博客。

状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:

  • Eventing层: 实现事件驱动架构,可以压入事件并由消费端消费事件,本层不关心事件是什么消费端执行什么,由上层实现。
  • ProcessController层: 由于上层的Eventing驱动一个“空”流程执行的执行,”state”的行为和路由都未实现,由上层实现;基于以上两层理论上可以自定义扩展任何”流程”引擎。
  • StateMachineEngine层: 实现状态机引擎每种state的行为和路由逻辑;提供API、状态机语言仓库。

3.2 Saga状态机引擎

注:此图来自seata官网的博客。

注:此图来自seata官网的博客。 Saga模式下,事务会根据json配置的state来执行,如果前一个state的正向服务执行成功,那么就路由到下一个state并执行下一个state的正向服务,如果执行失败,那么基于CompensateState属性执行补偿服务。

从代码的角度来看,Saga执行过程如下:

3.3 分布式事务的时序图

从时序图上可以看到,Saga模式和AT、TCC模式有较大的差异:

  • Saga模式下TM、RM均由开启事务的微服务承担,AT、TCC模式的TM、RM一般是分开的。
  • Saga模式由一个状态机实现,其实现过程主要包括:路由到正确的state + 执行state。

4 源码分析

4.1 TM开启事务

4.1.1 通过StateMachineEngine开启事务

Saga的分布式事务由StateMachineEngine开启。目前支持同步、异步方式,示例代码如下:

StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) ;
StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;

StateMachineEngine开启事务,主要包括以下几件事情:

  • 创建执行上线文ProcessContext。
  • 向TC开启全局事务,并返回xid。
  • 将state持久化到本地(DB)。
  • 向EventBus中发布Event。
private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
if (async && !stateMachineConfig.isEnableAsync()) {
throw new EngineExecutionException(
"Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.",
FrameworkErrorCode.AsynchronousStartDisabled);
}
if (StringUtils.isEmpty(tenantId)) {
tenantId = stateMachineConfig.getDefaultTenantId();
}
StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)
.withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction(
new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance)
.withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this);
Map<String, Object> contextVariables;
if (startParams != null) {
contextVariables = new ConcurrentHashMap<>(startParams.size());
nullSafeCopy(startParams, contextVariables);
} else {
contextVariables = new ConcurrentHashMap<>();
}
instance.setContext(contextVariables);
contextBuilder.withStateMachineContextVariables(contextVariables);
contextBuilder.withIsAsyncExecution(async);
ProcessContext processContext = contextBuilder.build();
if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
}
if (StringUtils.isEmpty(instance.getId())) {
instance.setId(
stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
}
if (async) {
stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
} else {
stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
}
return instance;
}

4.1.2 状态机的流程处理

状态机的流程处理主要包括两个部分:执行state、路由到下一个state。在ProcessController中定义了状态机的执行流程:

  • 找到合适的Processor,然后执行state中指定的方法。
  • 路由到下一个State,并执行state ProcessControllerImpl代码如下所示:
public void process(ProcessContext context) throws FrameworkException {
try {
businessProcessor.process(context);
businessProcessor.route(context);
} catch (FrameworkException fex) {
throw fex;
} catch (Exception ex) {
LOGGER.error("Unknown exception occurred, context = {}", context, ex);
throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);
}
}

4.2 处理器

4.2.1 根据状态机json配置找到对应的Processor

StateMachineProcessHandler对状态机的每一种type都配置了handler,如下图所示:

public void initDefaultHandlers() {
if (stateHandlers.isEmpty()) {
stateHandlers.put(DomainConstants.STATE_TYPE_SERVICE_TASK, new ServiceTaskStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SCRIPT_TASK, new ScriptTaskStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, new ServiceTaskStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, new SubStateMachineHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_CHOICE, new ChoiceStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUCCEED, new SucceedEndStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_FAIL, new FailEndStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_COMPENSATION_TRIGGER, new CompensationTriggerStateHandler());
}
}

结合前面的示例数据,分析状态机是如何找到合适的State处理器的:

"States": {
"ReduceInventory": {
"Type": "ServiceTask",  -> 使用ServiceTaskStateHandler处理
"ServiceName": "inventoryAction", -> 对应到bean的名字
"ServiceMethod": "reduce",          ->对应到bean中的具体方法
"CompensateState": "CompensateReduceInventory", -> 补偿的state
"Next": "ChoiceState",      ->下一个state
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
}
}
public void process(ProcessContext context) throws FrameworkException {
StateInstruction instruction = context.getInstruction(StateInstruction.class);
State state = instruction.getState(context);
String stateType = state.getType();
StateHandler stateHandler = stateHandlers.get(stateType);
List<StateHandlerInterceptor> interceptors = null;
if (stateHandler instanceof InterceptableStateHandler) {
interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();
}
List<StateHandlerInterceptor> executedInterceptors = null;
Exception exception = null;
try {
if (CollectionUtils.isNotEmpty(interceptors)) {
executedInterceptors = new ArrayList<>(interceptors.size());
for (StateHandlerInterceptor interceptor : interceptors) {
executedInterceptors.add(interceptor);
interceptor.preProcess(context);
}
}
stateHandler.process(context);
} catch (Exception e) {
exception = e;
throw e;
} finally {
if (CollectionUtils.isNotEmpty(executedInterceptors)) {
for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
StateHandlerInterceptor interceptor = executedInterceptors.get(i);
interceptor.postProcess(context, exception);
}
}
}
}

4.2.2 执行前置拦截器

主要包括以下内容:

  • 注册分支事务
  • 向本地数据库插入state(state_inst表) 代码见DbAndReportTcStateLogStore#recordStateStarted:
public void recordStateStarted(StateInstance stateInstance, ProcessContext context) {
if (stateInstance != null) {
boolean isUpdateMode = isUpdateMode(stateInstance, context);
// if this state is for retry, do not register branch
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {
if (isUpdateMode) {
stateInstance.setId(stateInstance.getStateIdRetriedFor());
} else {
// generate id by default
stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
}
// if this state is for compensation, do not register branch
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {
stateInstance.setId(generateCompensateStateInstanceId(stateInstance, isUpdateMode));
} else {
branchRegister(stateInstance, context);
}
if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
}
stateInstance.setSerializedInputParams(paramsSerializer.serialize(stateInstance.getInputParams()));
if (!isUpdateMode) {
executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType),
STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, stateInstance);
} else {
// if this retry/compensate state do not need persist, just update last inst
executeUpdate(stateLogStoreSqls.getUpdateStateExecutionStatusSql(dbType),
stateInstance.getStatus().name(), new Timestamp(System.currentTimeMillis()),
stateInstance.getMachineInstanceId(), stateInstance.getId());
}
}
}

4.2.3 Processor执行State

当type=ServiceTask时,将会由ServiceTaskStateHandler处理,具体逻辑如下:

  • 从state中获取beanname以及methodName。
  • 创建serviceInvoker对象。
  • serviceInvoker首先从applicationContext获取bean,并通过反射找到method。
  • 调用method方法。
  • 返回结果。 代码如下:
public void process(ProcessContext context) throws EngineExecutionException {
StateInstruction instruction = context.getInstruction(StateInstruction.class);
ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context);
String serviceName = state.getServiceName();
String methodName = state.getServiceMethod();
StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST);
Object result;
try {
List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);
//Set the current task execution status to RU (Running)
stateInstance.setStatus(ExecutionStatus.RU);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[{}], ServiceName[{}], Method[{}], Input:{}",
state.getName(), serviceName, methodName, input);
}
if (state instanceof CompensateSubStateMachineState) {
//If it is the compensation of the substate machine,
// directly call the state machine's compensate method
result = compensateSubStateMachine(context, state, input, stateInstance,
(StateMachineEngine) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_ENGINE));
} else {
StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker(
state.getServiceType());
if (serviceInvoker == null) {
throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]",
FrameworkErrorCode.ObjectNotExists);
}
if (serviceInvoker instanceof ApplicationContextAware) {
((ApplicationContextAware) serviceInvoker).setApplicationContext(
stateMachineConfig.getApplicationContext());
}
result = serviceInvoker.invoke(state, input.toArray());
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}",
state.getName(), serviceName, methodName, result);
}
if (result != null) {
stateInstance.setOutputParams(result);
((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_OUTPUT_PARAMS,
result);
}
} catch (Throwable e) {
LOGGER.error("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute failed.",
state.getName(), serviceName, methodName, e);
((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_CURRENT_EXCEPTION, e);
EngineUtils.handleException(context, state, e);
}
}

4.2.4 执行后置拦截器

执行完业务代码以后,会进入拦截器后置处理流程,主要包括以下内容:

  • 更新数据库中state状态(state_inst表)
  • 向TC报告Branch事务执行结果

4.3 路由

4.3.1 找到下一个state

路由的具体过程如下:

  • 在state中找到Next节点
  • 然后执行Expression表达式找到下一个state。
StateMachineProcessRouter代码:
public Instruction route(ProcessContext context) throws FrameworkException {
StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);
State state;
if (stateInstruction.getTemporaryState() != null) {
state = stateInstruction.getTemporaryState();
stateInstruction.setTemporaryState(null);
} else {
StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(
stateInstruction.getStateMachineName(), stateInstruction.getTenantId());
state = stateMachine.getStates().get(stateInstruction.getStateName());
}
String stateType = state.getType();
StateRouter router = stateRouters.get(stateType);
Instruction instruction = null;
List<StateRouterInterceptor> interceptors = null;
if (router instanceof InterceptableStateRouter) {
interceptors = ((InterceptableStateRouter)router).getInterceptors();
}
List<StateRouterInterceptor> executedInterceptors = null;
Exception exception = null;
try {
if (CollectionUtils.isNotEmpty(interceptors)) {
executedInterceptors = new ArrayList<>(interceptors.size());
for (StateRouterInterceptor interceptor : interceptors) {
executedInterceptors.add(interceptor);
interceptor.preRoute(context, state);
}
}
instruction = router.route(context, state);
} catch (Exception e) {
exception = e;
throw e;
} finally {
if (CollectionUtils.isNotEmpty(executedInterceptors)) {
for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
StateRouterInterceptor interceptor = executedInterceptors.get(i);
interceptor.postRoute(context, state, instruction, exception);
}
}
//if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if (instruction == null && !stateInstruction.isEnd()) {
EngineUtils.endStateMachine(context);
}
}
return instruction;
}
TaskStateRouter代码:
public Instruction route(ProcessContext context, State state) throws EngineExecutionException {
StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);
if (stateInstruction.isEnd()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"StateInstruction is ended, Stop the StateMachine executing. StateMachine[{}] Current State[{}]",
stateInstruction.getStateMachineName(), state.getName());
}
return null;
}
//The current CompensationTriggerState can mark the compensation process is started and perform compensation
// route processing.
State compensationTriggerState = (State)context.getVariable(
DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE);
if (compensationTriggerState != null) {
return compensateRoute(context, compensationTriggerState);
}
//There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.
String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);
if (StringUtils.hasLength(next)) {
context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);
} else {
next = state.getNext();
}
//If next is empty, the state selected by the Choice state was taken.
if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) {
next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
}
if (!StringUtils.hasLength(next)) {
return null;
}
StateMachine stateMachine = state.getStateMachine();
State nextState = stateMachine.getState(next);
if (nextState == null) {
throw new EngineExecutionException("Next state[" + next + "] is not exits",
FrameworkErrorCode.ObjectNotExists);
}
stateInstruction.setStateName(next);
return stateInstruction;
}

4.3.2 结束事务

在StateMachineProcessRouter#route方法中,我们可以看到,当没有下一个state时,将会通过以下代码结束事务,

if (CollectionUtils.isNotEmpty(executedInterceptors)) {
for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
StateRouterInterceptor interceptor = executedInterceptors.get(i);
interceptor.postRoute(context, state, instruction, exception);
}
}
//if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if (instruction == null && !stateInstruction.isEnd()) {
EngineUtils.endStateMachine(context);
}

结束事务的具体过程:

  • 更新状态机(state_machine_inst)状态为成功或失败。
  • 向TC汇报分布式事务的状态,即:通知TC进行global commit/rollback。

4.4 TC接收到通知全局事务Global Commit/Rollback

Saga模式下TC中执行的内容和AT模式非常相似,不过在TC收到Global Commit/Rollback时,TC仅修改全局事务状态,而不会立即进行回滚操作。具体是通过DefaultCoordinator中retryRollbacking、retryCommitting定时任务完成。

4.5 RM处理Global Commit/Rollback

Saga中分支事务参与这不管理分支事务的状态,所有均在TM中基于state进行管理,所以TC通知Global Commit/Rollback时,TM会作为RM来完成state的状态管理(Commit/Rollback)。 Commit流程:

  • 通过xid找到全局事务(状态机实例记录,state_machine_inst表)
  • 还原现场,让状态机继续执行

Rollback:

  • 通过xid找到全局事务(状态机实例记录,state_machine_inst表)
  • 还原现场,让状态机执行补偿流程。

在SagaResourceManager中,有branchCommit、branchRollback的处理逻辑,如下:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
try {
StateMachineInstance machineInstance = StateMachineEngineHolder.getStateMachineEngine().forward(xid, null);
if (ExecutionStatus.SU.equals(machineInstance.getStatus())
&& machineInstance.getCompensationStatus() == null) {
return BranchStatus.PhaseTwo_Committed;
} else if (ExecutionStatus.SU.equals(machineInstance.getCompensationStatus())) {
return BranchStatus.PhaseTwo_Rollbacked;
} else if (ExecutionStatus.FA.equals(machineInstance.getCompensationStatus()) || ExecutionStatus.UN.equals(
machineInstance.getCompensationStatus())) {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} else if (ExecutionStatus.FA.equals(machineInstance.getStatus())
&& machineInstance.getCompensationStatus() == null) {
return BranchStatus.PhaseOne_Failed;
}
} catch (ForwardInvalidException e) {
LOGGER.error("StateMachine forward failed, xid: " + xid, e);
//if StateMachineInstanceNotExists stop retry
if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
return BranchStatus.PhaseTwo_Committed;
}
} catch (Exception e) {
LOGGER.error("StateMachine forward failed, xid: " + xid, e);
}
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
try {
StateMachineInstance stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().reloadStateMachineInstance(xid);
if (stateMachineInstance == null) {
return BranchStatus.PhaseTwo_Rollbacked;
}
if (RecoverStrategy.Forward.equals(stateMachineInstance.getStateMachine().getRecoverStrategy())
&& (GlobalStatus.TimeoutRollbacking.name().equals(applicationData)
|| GlobalStatus.TimeoutRollbackRetrying.name().equals(applicationData))) {
LOGGER.warn("Retry by custom recover strategy [Forward] on timeout, SAGA global[{}]", xid);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().compensate(xid,
null);
if (ExecutionStatus.SU.equals(stateMachineInstance.getCompensationStatus())) {
return BranchStatus.PhaseTwo_Rollbacked;
}
} catch (EngineExecutionException e) {
LOGGER.error("StateMachine compensate failed, xid: " + xid, e);
//if StateMachineInstanceNotExists stop retry
if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
return BranchStatus.PhaseTwo_Rollbacked;
}
} catch (Exception e) {
LOGGER.error("StateMachine compensate failed, xid: " + xid, e);
}
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}

5 最佳实践

  • 适用场景 业务流程长、业务流程多
  • 优势 一阶段提交本地事务,无锁,高性能。 事件驱动架构,参与者可异步执行,高吞吐。 补偿服务易于实现。
  • 缺点 不保证隔离性
  • 注意 因为需要自己实现正向服务和逆向补偿服务,所以TCC模式遇到的问题,此模式一样存在,即:
  • 允许空补偿
  • 防悬挂控制
  • 幂等控制

6 参考文档

Seata Saga模式

分布式事务 Seata 及其三种模式详解 | Meetup#3 回顾

Seata 原理

Seata-AT模式 原理

Seata-TCC模式 原理

Seata-Saga模式 原理

Seata-XA模式 原理

TCC-Transaction原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/191742.html原文链接:https://javaforall.cn