zl程序教程

您现在的位置是:首页 >  其他

当前栏目

探索Wiredtiger引擎基于B-Tree数据写入分析

引擎数据 分析 探索 基于 写入 Tree
2023-06-13 09:17:21 时间

Wiredtiger支持的存储模型

  • Wiredtiger目前支持btreelsm的存储模型,也是唯一一个支持2种存储模型的kv磁盘存储引擎

Wiredtiger数据插入实现概览

  • 如下是Wiredtiger引擎的数据插入的的整体概览,从数据库连接初始化->会话初始化->游标初始化->调用游标的内置函数进行数据插入操作

Wiredtiger数据插入实现分析

wiredtiger_open函数
  • wiredtiger_open函数是整个wt引擎的初始化函数,定义了WT_CONNECTION的虚函数操作表,wt引擎参数初始化、内部线程初始化等工作,这里我们着重关心db中WT_CONNECTION默认值。
// 通过传入WT_CONNECTION **类型参数初始化WT_CONNECTION,设置WT_CONNECTION操作函数表
int wiredtiger_open(...)
{
	// WT_CONNECTION
    static const WT_CONNECTION stdc = {__conn_close, __conn_debug_info, __conn_reconfigure,
      __conn_get_home, __conn_configure_method, __conn_is_new, __conn_open_session,
      __conn_query_timestamp, __conn_set_timestamp, __conn_rollback_to_stable,
      __conn_load_extension, __conn_add_data_source, __conn_add_collator, __conn_add_compressor,
      __conn_add_encryptor, __conn_add_extractor, __conn_set_file_system, __conn_add_storage_source,
      __conn_get_storage_source, __conn_get_extension_api};

	// WT_CONNECTION具体实现的结构 
	WT_CONNECTION_IMPL *conn;
    WT_RET(__wt_calloc_one(NULL, &conn));
    conn->iface = stdc;

	// WT_CONNECTION的初始化设置
	*connectionp = &conn->iface;
	/*********其他初始化******/
}
open_session函数
  • conn->open_session函数实际调用的是WT_CONNECTION->open_session对应的__conn_open_session函数.这个函数核心初始化session比如设置session的函数表、会话的事务初始化等工作
// WT_CONNECTION->open_session的具体实现
// 参数中包含WT_SESSION **wt_sessionp,初始化session和设置session的函数操作表
static int __conn_open_session(...)
{
	 WT_SESSION_IMPL  *session_ret;
	 WT_ERR(__wt_open_session(conn, event_handler, config, true, &session_ret))
	 {
		 /* Acquire a session. */
	    WT_RET(__open_session(conn, event_handler, config, &session))
	    {
		    static const WT_SESSION
      stds = {NULL, NULL, __session_close, __session_reconfigure, __session_flush_tier,
        __wt_session_strerror, __session_open_cursor, __session_alter, __session_create,
        __wt_session_compact, __session_drop, __session_join, __session_log_flush,
        __session_log_printf, __session_rename, __session_reset, __session_salvage,
        __session_truncate, __session_upgrade, __session_verify, __session_begin_transaction,
        __session_commit_transaction, __session_prepare_transaction, __session_rollback_transaction,
        __session_query_timestamp, __session_timestamp_transaction,
        __session_timestamp_transaction_uint, __session_checkpoint, __session_reset_snapshot,
        __session_transaction_pinned_range, __session_get_rollback_reason, __wt_session_breakpoint},
      stds_readonly = {NULL, NULL, __session_close, __session_reconfigure, __session_flush_tier,
        __wt_session_strerror, __session_open_cursor, __session_alter_readonly,
        __session_create_readonly, __wt_session_compact_readonly, __session_drop_readonly,
        __session_join, __session_log_flush_readonly, __session_log_printf_readonly,
        __session_rename_readonly, __session_reset, __session_salvage_readonly,
        __session_truncate_readonly, __session_upgrade_readonly, __session_verify,
        __session_begin_transaction, __session_commit_transaction,
        __session_prepare_transaction_readonly, __session_rollback_transaction,
        __session_query_timestamp, __session_timestamp_transaction,
        __session_timestamp_transaction_uint, __session_checkpoint_readonly,
        __session_reset_snapshot, __session_transaction_pinned_range, __session_get_rollback_reason,
        __wt_session_breakpoint};
	    }
	 }
   
	 session_ret->iface = F_ISSET(conn, WT_CONN_READONLY) ? stds_readonly : stds;
    session_ret->iface.connection = &conn->iface;
	 *wt_sessionp = &session_ret->iface;
	/*******其他初始化******/
}
create函数
  • create函数是session中的create函数指针,实际对应的函数是__session_create,这个函数的核心职责是检查创建的数据表是否在系统中存在->构建新表的元数据信息->在数据库中创建表的数据文件->表对应的元数据信息插入到系统表WiredTiger.wt中,经历这个几个过程整个的schema(表)创建成功。table开头的uri是针对元数据而言,file来头的uri对应的是物理磁盘上的表的结构
static int
__session_create(...const char *uri...)
{
	__wt_session_create(session, uri, config)
	{
		__wt_schema_create(session, uri, config)
		{
			// 这里的uri=table:access
			__schema_create(int_session, uri, config)
			{
				__create_table(session, uri, exclusive, config)
				{
					// 检查创建表的meta数据是否存在
					__wt_metadata_search(session, uri, &tablecfg)
					// 构造数据表的meta信息
					__wt_import_repair(session, filename, &filecfg)
					// 创建表的元数据插入到数据库元数据系统表中(file:WiredTiger.wt),元数据中的key=uri表的uri,value=tablecfg配置信息
					__wt_metadata_insert(session, uri, tablecfg)
					// 这里对应的uri=colgroup:access
					__create_colgroup(session, cgname, exclusive, cgcfg)
					{
						
						__wt_schema_create(uri...)
						{
							// __schema_create函数里调用__create_file
							// uri=file:access.wt
							__create_file(uri...)
							{
								__wt_metadata_insert(...)
								{
									// 插入file:access.wt的元数据
									__curfile_insert(...)
								}
							}
						}
					}
				}
			}
		}
	}
}
open_cursor函数
  • open_cursorWT_SESSION->open_cursor函数指针的实现,其实际的实现是__session_open_cursor函数,根据不同的uri关键字来负责初始化游标和设置游标中的操作函数。
static int __session_open_cursor(...)
{
	__session_open_cursor_int()
	{
		switch (uri[0]) {
			// 初始化btree游标
			case `f`:  
			__wt_curfile_open(session, uri, owner, cfg, cursorp);
			/***其他的case****/
		}
	}
}
// uri = file:access.wt,设置btree插入的函数操作表
int __wt_curfile_open(...uri)
{
	__curfile_create(...)
	{
		WT_CURSOR_STATIC_INIT(iface, __wt_cursor_get_key, /* get-key */
	      __wt_cursor_get_value,                          /* get-value */
	      __wt_cursor_set_key,                            /* set-key */
	      __wt_cursor_set_value,                          /* set-value */
	      __curfile_compare,                              /* compare */
	      __curfile_equals,                               /* equals */
	      __curfile_next,                                 /* next */
	      __curfile_prev,                                 /* prev */
	      __curfile_reset,                                /* reset */
	      __curfile_search,                               /* search */
	      __curfile_search_near,                          /* search-near */
	      __curfile_insert,                               /* insert */
	      __wt_cursor_modify_value_format_notsup,         /* modify */
	      __curfile_update,                               /* update */
	      __curfile_remove,                               /* remove */
	      __curfile_reserve,                              /* reserve */
	      __wt_cursor_reconfigure,                        /* reconfigure */
	      __wt_cursor_largest_key,                        /* largest_key */
	      __wt_cursor_bound,                              /* bound */
	      __curfile_cache,                                /* cache */
	      __curfile_reopen,                               /* reopen */
	      __wt_cursor_checkpoint_id,                      /* checkpoint ID */
	      __curfile_close);


		// cursor的初始化
		WT_RET(__wt_calloc(session, 1, csize, &cbt));
	    cursor = &cbt->iface;
	    *cursor = iface;
	    cursor->session = (WT_SESSION *)session;
	    cursor->internal_uri = btree->dhandle->name;
	    cursor->key_format = btree->key_format;
	    cursor->value_format = btree->value_format;
    
		// 对应的btree初始化
		__wt_btcur_open(cbt);
	}
}
insert函数
  • insert函数是cursor的函数指针,实际的调用的函数是__curfile_insert函数,把数据插入到btree中,最终的数据落盘还是依赖于checkpoint线程来刷盘
// WT_CURSOR->insert具体实现
static int __curfile_insert(WT_CURSOR *cursor)
{
	WT_ERR(__wt_btcur_insert(cbt));
}

Wiredtiger数据插入的示例代码

#include <wiredtiger_ext.h>
#include <wiredtiger.h>
#include <unistd.h>
#include <assert.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
static void insert_kv(const char *home,char *counter_str)
{
    /*! [access example connection] */
    WT_CONNECTION *conn;
    WT_CURSOR *cursor;
    WT_SESSION *session;
    const char *key, *value;
    int ret;
    mkdir(home,0755);
    /* Open a connection to the database, creating it if necessary. */
    assert(wiredtiger_open(home, NULL, "create", &conn)!=-1);

    /* Open a session handle for the database. */
    assert(conn->open_session(conn, NULL, NULL, &session)!=-1);
    /*! [access example connection] */


    /*! [access example table create] */
    assert(session->create(session, "table:access", "key_format=S,value_format=S")!=-1);
    /*! [access example table create] */

    /*! [access example cursor open] */
    assert(session->open_cursor(session, "table:access", NULL, NULL, &cursor)!=-1);
    /*! [access example cursor open] */
    uint64_t i = 0;
    char *endptr=NULL;
    uint64_t count= strtol(counter_str, &endptr, 10);
    fprintf(stdout,"pid=%ld\n",getpid());  
    sleep(15);
    while (true) {
        if (i >= count) {
            break;
        }
        char k_buf[1024] = {'\0'};
        char v_buf[1024] = {'\0'};
        snprintf((char *)&k_buf, 1024, "key-%ld", i);
        snprintf((char *)&v_buf, 1024, "value-%ld", i);

        /*! [access example cursor insert] */
        cursor->set_key(cursor,(char *)&k_buf); /* Insert a record. */
        cursor->set_value(cursor, (char *)&v_buf);
        assert(cursor->insert(cursor)!=-1);
        usleep(100);
        i++;
    }

    assert(conn->close(conn, NULL)!=-1); /* Close all handles. */
                                          /*! [access example close] */
}

int
main(int argc, char *argv[])
{

    insert_kv(argv[1],argv[2]);

    return 0;
}