zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

数据库内核月报 - 2015 / 08-MySQL · 功能分析 · 5.6 并行复制实现分析

mysql数据库内核 实现 分析 2015 08 5.6
2023-09-14 09:00:57 时间

我们知道MySQL的主备同步是通过binlog在备库重放进行的,IO线程把主库binlog拉过去存入relaylog,然后SQL线程重放 relaylog 中的event,然而这种模式有一个问题就是SQL线程只有一个,在主库压力大的时候,备库单个SQL线程是跑不过主库的多个用户线程的,这样备库延迟是不可避免的。为了解决这种n对1造成的备库延迟问题,5.6 引入了并行复制机制,即SQL线程在执行的时候可以并发跑。

关于其背后的设计思想,可以参考这几个worklog WL#4648WL#5563WL#5569WL#5754WL#5599,之前的月报也对并行复制原理进程了阐述,读者朋友可以回顾下

本篇将从代码实现角度讲述并行复制是如何做的,分析基于MySQL 5.6.26。

binlog

binlog 是对数据库更改操作的记录,里面是一个个的event,如类似下面的event序列:

Query_log

Table_map

Write/Delete/Update_row_event

关于每个event的含义可以参考官方文档

并行复制提供了几个参数配置,可以通过修改参数值对其进行调节。

slave_parallel_workers // worker 线程个数

slave-checkpoint-group // 隔多少个事务做一次 checkpoint

slave-checkpoint-period // 隔多长时间做一次 checkpoint

slave-pending-jobs-size-max // 分发给worker的、处于等待状态的event的大小上限

下面是并行复制中用到几个概念:

MTS // Multi-Threaded Slave,并行复制

group // 一个事务在binlog中对应的一组event序列

worker // 简称W,event 执行线程,MTS新引入的

Coordinator // 简称C,分发协作线程,就是之前的 SQL线程

checkpoint // 简称CP,检查点,C线程在满足一定条件下去做,目的是收集W线程执行完信息,向前推动执行位点

B-event // 标志事务开始的event,BEGIN 这种Query或者GTID

G-event // 包含分发信息的event,如Table_map、Query

T-event // 标志事务结束的event,COMMIT/ROLLBACK 这种Query 或者XID

相关代码文件

sql/rpl_rli_pdb.h // pdb的是 parallelized by db name简写WL#5563
sql/rpl_rli_pdb.cc
sql/rpl_slave.cc
sql/log_event.cc
sql/rpl_rli.h

并行执行原则 并行执行的基本模型是生产者-消费者,C线程将event按db插入各W线程的任务队列,W线程从队列里取出event执行;
MTS 并行复制模型 同一个group(事务)内的event都发给同一个worker,保证事务的一致性; 分发关系由包含db信息的event(G-evnet)决定,其它event按决定好的关系进行分发; 重要数据结构

db_worker_hash_entry,db- worker 映射关系,也即分发关系,所有的分发关系缓存在C的一个HASH表中(APH)

 - db // db 名

 - worker // 指向worker的指针,表示被分发到的W线程

 - usage // 有多少正在分发的group用到这个关系

 - temporary_tables // 用于在C和W之前传递临时表


circular_buffer_queue,用DYNAMIC_ARRAY arrary实现的一个首尾相连的环形队列,是其他重要数据结构的基类

 - Q // 底层用到的 DYNAMIC_ARRAY

 - size // Queue 的容量

 - avail // 队列尾

 - entry // 队列头

 - len // 队列实际大小

 - de_queue() // 出队操作

 - de_tail() // 尾部出队

 - en_queue() // 入队

 - head_queue() // 取队列头,但是不出队


Slave_job_group,维护一个正在执行的事务的信息,如对应的位点信息、事务分发到的worker、有没有执行完等。

 - group_master_log_name // 对应主库的 binlog 文件名

 - group_master_log_pos // 对应在主库 binlog 中的位置

 - group_relay_log_name // 对应备库 relaylog 文件名

 - group_relay_log_pos // 对应在备库 relaylog 中的位置

 - worker_id // 对应的worker的id

 - worker // worker 指针

 - total_seqno // 当前group是启动以来执行的第几个group

 - master_log_pos // group中B-event的位置

 - checkpoint_seqno // 当前group是从上次做完CP后的第几个group

 - checkpoint_log_pos // worker收到checkpoint信号后更新

 - checkpoint_log_name // 同上

 - checkpoint_relay_log_pos // 同上

 - checkpoint_relay_log_name // 同上

 - done // 这个group是否已经被worker commit掉

 - shifted // checkpoint 的时候shift值

 - ts // 时间,更新SBM

 - reset() // 重置上面的成员变量


Slave_committed_queue,维护分发执行的group信息,是circular_buffer_queue的子类,队列里存的时 Slave_job_group

 - lwm // 类型是Slave_job_group,低水位(Low-Water-Mark),表示上次CP执行到的位置

 - last_done // 类型是一个DYNAMIC_ARRAY,里面存的是Slave_job_group:total_seqno,表示每个worker执行到第几个group

 - assigned_group_index // 正在分发的group在GAQ中的位置

 - move_queue_head() // 做checkpoint时,把已经commit的group移出队列

 - get_job_group() // 返回队列指定位置的Slave_job_group

 - en_queue() // 入队一个 Slave_job_group


Slave_jobs_queue,任务队列,也是circular_buffer_queue的子类,队列里存的是slave_job_item,每个worker有一个这样的任务队列

 - overfill // 队列满标志

 - waited_overfill // 队列满的次数


Slave_worker,对应一个worker,Relay_log_info 的子类

 - jobs // 类型是 Slave_jobs_queue,C分发过来的event都放在这里面

 - c_rli // 指向C的指针

 - curr_group_exec_parts // 类型是 DYNAMIC_ARRAY,里面存的是当前group用到的分发关系,是指向APH成员的指针,简写CGEP

 - curr_group_seen_begin // 当前所在 group 有没有解析到 B-event

 - id // worker 的id标识

 - last_group_done_index // worker上一次执行的group在GAQ中的位置

 - gaq_index // worker 当前执行的的事务在GAQ中的位置

 - usage_partition // worker用到的分发关系个数

 - end_group_sets_max_dbs // 和串行执行相关的

 - bitmap_shifted // CP后bitmap需要偏移的距离,用于调整 group_executed

 - wq_overrun_cnt // 超载多少

 - overrun_level // 超载指标

 - underrun_level // 饥饿指标

 - excess_cnt // 用于往mts_wq_excess_cnt累计

 - group_executed // 类型是 MY_BITMAP,标示CP后执行的group

 - group_shifted // 类型是 MY_BITMAP,计算group_executed,临时用作中间变量

 - running_status // 标识 worker 线程的状态,可以有 NOT_RUNNING、RUNNING、ERROR_LEAVING、KILLED

 - slave_worker_ends_group () // 当一个group执行完或者异常终止时会调用

 - commit_positions() // group执行完是调用,用于更新位点和bitmap

 - rollback_positions() // 回滚bitmap


Relay_log_info,对应C线程,在MTS之前对应SQL线程,为了支持并行复制,在原来的基础上又加了一些成员

 - mapping_db_to_worker // 非常重要的成员,类型是HASH,用于缓存所有的分发关系,APH(Assigned Partition Hash),目的能通过db快速找到映射关系,但HASH长度大于mts_partition_hash_soft_max(固定16)时,会对没有使用的映射关系进行回收。

 - workers // 类型是 DYNAMIC_ARRAY,成员是一个个Slave_worker

 - pending_jobs // 一个统计信息,表示待执行job个数

 - mts_slave_worker_queue_len_max // 每个worker最多能容纳jobs的个数,目前hard code是16384

 - mts_pending_jobs_size // 所有worker的job占的内存

 - mts_pending_jobs_size_max // 所有worker的job占的内存,对应配置 slave_pending_jobs_size_max

 - mts_wq_oversize // 标示job占用内存已达上限

 - gaq // 非常重要的成员,代码注释里经常提到的GAQ,类型是Slave_committed_queue,存的成员是Slave_job_group,大小对应配置 slave-checkpoint-group,用于W和C交互

 - curr_group_assigned_parts // 类型是 DYNAMIC_ARRAY,当前group中已经分配的event的映射关系,可以和Slave_worker的curr_group_exec_parts对应,简写CGAP

 - curr_group_da // 类型是DYNAMIC_ARRAY,对于还无法决定分发worker的event,先存在这里

 - mts_wq_underrun_w_id // 标识比较空闲的worker的id

 - mts_wq_excess_cnt // 标示worker的超载情况

 - mts_worker_underrun_level // 当W的任务队列大小低于这个值的认为处于饥饿状态

 - mts_coordinator_basic_nap // 当work负载较大时,C线程sleep,会用到这个值

 - opt_slave_parallel_workers // 对应配置 slave_parallel_workers

 - slave_parallel_workers // 当前实际的worker数

 - exit_counter // 退出时用

 - max_updated_index // 退出时用

 - checkpoint_seqno // 上次CP后分发的group个数

 - checkpoint_group // 对应配置 mts_checkpoint_group

 - recovery_groups // 类型是 MY_BITMAP,恢复时用到

 - mts_group_status // 分发线程所处的状态,取值为 MTS_NOT_IN_GROUP、MTS_IN_GROUP、MTS_END_GROUP、MTS_KILLED_GROUP

 - mts_events_assigned // 分发的event计数

 - mts_groups_assigned // 分发的group计数

 - least_occupied_workers // 类型是 DYNAMIC_ARRAY,从注释将worker按从空闲到繁忙排序的一个数组,用于先worker用,但是实际并未用到。 

 - last_clock // 上次做checkpoint的时间


 map_db_to_worker() // 把db映射给worker

 get_least_occupied_worker() // 获取负载最小的worker

 wait_for_workers_to_finish() // 等待worker完成,并发临时转成串行是用到

 append_item_to_jobs() // 把任务分发给 worker

 mts_move_temp_table_to_entry() // 用于传递临时表

 mts_move_temp_tables_to_thd() // 同上


和单线程SQL相比,MTS需要初始化新加的MTS变量和启动worker线程。

主要是slave_start_workers()这个函数。会初始化C线程的MTS变量,如workers、curr_group_assigned_parts、curr_group_da、gaq等,接着调用init_hash_workers() 初始化HASH表mapping_db_to_worker,在这些做完后依次调用 slave_start_single_worker() 初始化每个worker并启动W线程。worker 的的初始化包括jobs任务队列、curr_group_exec_parts 等相关变量,其中jobs长度目前是固定的16384,目前还不可配置;worker线程的主函数是handle_slave_worker(),不停的调用slave_worker_exec_job()来执行C分配的event。

Coordinator 分发协作

分发线程主体和之前的SQL线程基本是一样的,不停的调用 exec_relay_log_event() 函数。exec_relay_log_event()主要分2部分,一是调用next_event()读取relay log,一是apply_event_and_update_pos() 做分发。

next_event() 比较简单,就是不停的用 Log_event::read_log_event() 从relay log 读取event,除此之外还会调用mts_checkpoint_routine() 做checkpoint,后面会详细讲checkpiont过程。

apply_event_and_update_pos()进行分发的入口是Log_event::apply_event(),如果没有开MTS,就是原来的逻辑,SQL线程直接执行event,如果开了MTS的话,调用get_slave_worker(),这个是分发的主逻辑。

在介绍分发逻辑前,先将所有的binlog event 可以分下类(代码里是这么分的):

B-event // BEGIN(Query) 或者 GTID

G-event // 包含db信息的event,Table_map 或者 Query

P-event // 一般放在G-event前的,如int_var、rand、user_var等

R-event // 一般放在G-event后的,如各种Rows event

T-event // COMMIT/ROLLBACK(Query) 或者XID

分发逻辑是这样的:

如果是B-event,表明是事务的开始,mts_groups_assigned 计数加1,同时GAQ中入队一个新的group,表示一个新的事务开始,然后把event放入curr_group_da,因为B-event没有db信息,还不知道分发给哪个worker; 如果是G-event,event里包含db信息,就需要按这个db找到一个分发到的worker,worker选择机制是map_db_to_worker()实现。调用map_db_to_worker()时,有2个参数比较重要,一个是dbname,这个就是分发关系的key,一个是last_worker,表示当前group中event上一次分发到的worker(last_assigned_worker); 在当前group已经用到的映射关系(curr_group_assigned_parts CGAP)中找,如果有同db的映射关系,就直接返回last_worker;如果找不到,就去APH中按db名搜索; 如果APH中搜到的话,分3种情况,a) 这个映射关系没有group用到,就直接把db映射为last_worker,如果last_worker为空的主话,就找一个最空闲的worker,get_least_occupied_worker() b) 这个映射关系有group用,并且对应的worker和last_worker一样,就用last_worker,映射关系引用计数加1 c) 如果映射关系对应的worker和last_worker不一样,这表示有冲突,就需要等到引用这个映射关系的group全部执行完,然后把db映射为last_worker; 如果没搜到的话,就新生成一个映射关系,key用db,value用last_worker,如果last_worker为空的话,选最空闲的worker,get_least_occupied_worker(),并把新生成的映射插入到APH中,如果HASP表长度大于 mts_partition_hash_soft_max 的话,在插入前会对APH做一次收缩,从中去除掉没有被group引用的映射关系; 把选择的映射关系插入到 curr_group_assigned_parts 中。

什么时候切换为串行?
如果G-event包含的db个数大于MAX_DBS_IN_EVENT_MTS(16个),或者更新的表被外键依赖,那么就需要串行执行当前group。串行固定选用第0个worker来执行,在分发前会等待其它worker全部执行完,在分发后会等待所有worker执行完。gropu执行完后自动切换为并行执行。

worker 确定好了,下一步就是分发event了,入口函数 append_item_to_jobs()。这个函数的作用非常明确,就是把event插入到worker的jobs队列中,在插入前会有对event大小有检查:

如果event大小已经超过了等待任务大小的上限(配置slave-pending-jobs-size-max ),就报event太大的错,然后返回; 如果event大小+已经在等待的任务大小超过了slave-pending-jobs-size-max,就等待,至到等待队列变小; 如果当前的worker的队列满的话,也等待。 Worker 执行

W线程执行的主逻辑是 slave_worker_exec_job():

从自己的job队列里取出event; 根据event的信息,来更新worker中的变量,如curr_group_exec_parts(CGEP)、future_event_relay_log_pos、gaq_index等; 执行event,do_apply_event_worker(),最终调用每个event的do_apply_event()方法,和单线程下一样; 如果是T event,调用 slave_worker_ends_group(),表示一个事务已经执行完了,a) 更新位点,通过commit_positions(),更新事务在GAQ中对应的Slave_job_group,这样C就知道W执行到哪了,另外还会更新W的bitmap信息(如果是xid event,在apply_event中就会调用commit_positions) b) 清空 curr_group_exec_parts,将映射关系中的引用数减1; 更新C的队列统计信息,如等待执行任务数pending_jobs,等待执行任务大小mts_pending_jobs_size等; 更新 overrun 和 underrun 状态。

分发和执行逻辑可以用下图简单表示:
MTS 分发逻辑
C线程在GAQ中插入group,标示一个要执行的事务,接着确定分发关系(从CGAP或者APH中,或者生成新的),然后按映射关系把event分发给对应worker的job队列;worker在执行event过程中更新自己的CGEP,在执行完整个group后,根据CGEP中的记录去更新APH中引用关系的计数,同时把GAQ中的对应group标示为done。

checkpoint 过程

如前所述,C线程会在从relaylog读取event后,会尝试做checkpoint,入口函数是mts_checkpoint_routine()。checkpoint的作用是把worker执行完的事务从GAQ中去除,向前推进事务完成点。

有2个条件会触发checkpoint:

当前时间距上次checkpoint已经超过配置 mts-checkpoint-period,这时会尝试做一次checkpoint,不管有没有向前推进事务; 上一次checkpoint后分发的事务数已经到达checkpoint设置上限(slave-checkpoint-group),这时会强制做checkpoint,如果一次checkpoint没成功,会一直重试,直至成功。

GAQ中的事务推进通过 Slave_committed_queue::move_queue_head() 实现,从前向后扫描GAQ中的group:

如果当前group已经完成(通过标志Slave_job_group.done标志确认),就把这个group出队,同时把这个出队的group信息赋给低水位lwm,向前推进; 如果遇到没有完成的group,就是遇到一个gap,表示对应worker还没执行完当前group,checkpoint不能再向前推进了,到此结束,返回值就是退出前已经推进的group个数。

MTS checkpoint逻辑

slave 停止

类似单线程复制,stop slave 命令会终止C线程和W线程的运行。

C线程收到退出信号后,会先调用slave_stop_workers()终止W线程,过程如下:

依次把每个运行中的 worker 的 runnig_status 设置Slave_worker::STOP,同时设置worker执行终止位置rli- max_updated_index; C线程等待所有W线程终止(w- running_status == Slave_worker::NOT_RUNNING); 调用mts_checkpoint_routine(),做一次checkpoint; 释放资源,如APH、GAQ、CGDA(curr_group_da)、CGAP(curr_group_assigned_parts)等。

W线程在pop_jobs_item()中会调用set_max_updated_index_on_stop(),会检查2个条件 1) job队列是空的,2) 当前worker执行的事务在GAQ中的位置,是否已经超过rli- max_updated_index;任一条件满足就设置状态 running_status 为 Slave_worker::STOP_ACCEPTED,表示开始退出。

从上面的逻辑可以看出,在收到stop信号后,worker线程会等正在执行的group完成后,才会退出。

W被kill或者执行出错


slave_worker_exec_job() 进入错误处理逻辑,调用Slave_worker::slave_worker_ends_group(),给C线程发KILL_QUERY信号,然后做相关变量的清理,把job队列的任务全部清理掉,最终把running_status置为Slave_worker::NOT_RUNNING,表示结束; C线程收到kill信号后,停止分发,然后进入slave_stop_workers()逻辑,给活跃的W线程发送STOP信号; 其它W线程收到STOP信号后,会处理job队列中所有的event; 和stop slave不同的是,C线程最后不会做checkpoint。

C被kill

C被kill的处理逻辑和stop slave差不多,不同之处在于等worker全部终止后,不会做checkpoint。

Slave线程重启(正常关闭或者异常kill)后,需要根据Coordinator和每个Worker的记录信息来进行恢复,推进到一个一致状态后再开始并行,详细过程我们下期月报再分析。

存在的问题

5.6 的MTS是按db来进行分发的,分发粒度太大,如果只有一个db的时候,就没有并发性了,所有group都分给一个worker,就变成单线程执行了。一个简单的优化改进是改成按table来分发,只需要把分发的key从dbname改成dbname + tablename,整体分发逻辑不需要变动。再进一步,如果遇到热点表更新呢,这时候binlog里记录的event都是针对一个表的更新,又会变成串行执行。这个时候就需要变化一下分发测略喽,如按事务维度进行分发,这个策略对源码的改动就会比较大些,有需要的同学可以试试:-)


TDSQL中修复的mysql内核bug 在TDSQL这两年多的开发工作中,我感觉很自豪的一件事是我修复了不少mysql-5.7.17和mariadb-10.1.9的内核bug,这些bug大多已经报告给了MySQL/MariaDB官方开发团队,在每个bug描述中我会贴出来bug报告的连接。本文将大略介绍这些bug的概况,我在将来会写更多文章详细介绍每个bug的具体问题分析以及解决思路。本文列出的所有bug都已经修复,经过验证可以正确工作并解决相关问题。 这里先说一下为什么我要提交代码给mysql/mariadb官方开发团队,主要有一下几个好处: 1. 官方开发者可以review我提交的patch,帮助完善patch,发现和解决之前
MySQL · 内核特性 · 统计信息的现状和发展 简介我们知道查询优化问题其实是一个搜索问题。基于代价的优化器 ( CBO ) 由三个模块构成:计划空间、搜索算法和代价估计 [1] ,分别负责“看到”最优执行计划和“看准”最优执行计划。如果不能“看准”最优执行计划,那么优化器基本上就是瞎忙活,甚至会产生严重的影响,出现运算量特别大的 SQL ,造成在线业务的抖动甚至崩溃。在上图中,代价估计用一个多项式表示,其系数 c 反应了硬件环境和算子特性,而
关于MySQL内核,一定要知道的! 近一个多月,写了一些MySQL内核的文字,稍作总结,希望对大家有帮助。1.《InnoDB,为何并发如此之高?》 文章介绍了: (1)什么是并发控制; (2)并发控制的常见方法:锁,数据多版本; (3)redo,undo,回滚段的实践; (4)InnoDB如何利用回滚段实现MVCC,实现快照读。
MySQL · 引擎特性 · MySQL内核对读写分离的支持 读写分离的场景应用 随着业务增长,数据越来越大,用户对数据的读取需求也随之越来越多,比如各种AP操作,都需要把数据从数据库中读取出来,用户可以通过开通多个只读实例,将读请求业务直接连接到只读实例上。使用RDS云数据库的读写分离功能,用户只需要一个请求地址,业务不需要做任何修改,由RDS自带的读写分离中间件服务来完成读写请求的路由及根据不同的只读实例规格进行不同的负载均衡,同时当只读实例出现故障时能够主动摘除,减少对用户的影响。
MySQL · 引擎特性 · Group Replication内核解析之二 前文已经介绍了MySQL的Group Replication的实现机制和原理,本文就Group Replication的具体实现进行详细的阐述,以更深入的理解Group Replication的机制,在实践中更好的应用Group Replication,提升应用系统的可用性,优化其性能。
db匠 rds内核团队秘密研发的全自动卖萌机. 追加特效: 发数据库内核月报. 月报传送: http://mysql.taobao.org/monthly/