zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

eos源码赏析(二十四):主网数据同步及落库MongoDB

2023-03-15 22:08:26 时间

对于eos源码有一定了解的同学应该知道,整个eos架构是插件化的,这样更好的降低了整个软件系统的耦合性,也使得各个插件之间交互的顺畅。我们在前面的文章中多次提到了producer_plugin(用于出块等功能)、history_plugin(用于查询等功能)、chain_plugin(命令行相关的操作等功能),还有两个很关键的插件我们没有介绍到,即net_plugin和mongo_db_plugin。net_plugin和区块数据的广播以及主网数据的同步有关,mongo_db_plugin用于存储区块、交易等信息,今天我们结合主网数据同步并落库到MongoDB的过程来简单的分析下这两个插件的使用。

本文主要包含以下内容:

  • 主网数据同步
  • 数据写入MongoDB

1、主网数据同步

关于主网数据如何同步至本地node并写入数据库的配置我们在这里不再赘述,可以通过搜索引擎获取相应的文件同时获取到最新的可用的p2p节点即可,如果在主网数据同步中遇到问题的朋友也可以在文章下留言或者在群内交流,我们今天来看主网数据同步的过程中做了哪些操作。

当我们同步主网数据的时候可以简单的把我们本地的node看做是一个client,通过p2p节点从主网获取区块、交易等消息。关于这个client的构建,有很多内容要写但并不是我们本文的重点,我们且来看看数据处理的过程,在net_plugin中有一个句柄函数,使用了visitor模式:

   struct msgHandler : public fc::visitor<void> {
      net_plugin_impl &impl;
      connection_ptr c;
      msgHandler( net_plugin_impl &imp, connection_ptr conn) : impl(imp), c(conn) {}

      template <typename T>
      void operator()(const T &msg) const
      {
         impl.handle_message( c, msg);
      }
   };

以区块信息为例,当我们接收到区块信息的时候回调用以下函数来处理,注意我们这里添加了部分日志打印区块的id、编号、区块生产的节点名等信息,受限于篇幅长度异常部分的代码省略,感兴趣的同学可自行打开源码进行查看:

void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
      controller &cc = chain_plug->chain();
      block_id_type blk_id = msg.id();
      uint32_t blk_num = msg.block_num();
      fc_dlog(logger, "canceling wait on ${p}", ("p",c->peer_name()));
      c->cancel_wait();
      dlog("handle_message  signed_block,blk_id:${blk_id};blk_num:${blk_num};peer_name:${peer_name}", ("blk_id", blk_id)("blk_num", blk_num)("peer_name", c->peer_name()));
      try {
         //第一次验证,后面还会有好几次验证
         if( cc.fetch_block_by_id(blk_id)) {
            //net_plugin本身做一个区块信息的分发
            sync_master->recv_block(c, blk_id, blk_num);
            return;
         }
      }
      dispatcher->recv_block(c, blk_id, blk_num);
      fc::microseconds age( fc::time_point::now() - msg.timestamp);
      peer_ilog(c, "received signed_block : #${n} block age in secs = ${age}",
              ("n",blk_num)("age",age.to_seconds()));

      go_away_reason reason = fatal_other;
      try {
         //将区块信息送至chain_plugin
         signed_block_ptr sbp = std::make_shared<signed_block>(msg);
         chain_plug->accept_block(sbp); //, sync_master->is_active(c));
         reason = no_reason;
      } 
      //等等内容
   }

有些地方需要注意,我们在本文中为了方便跟踪代码,加了很多log打印。在接收到区块信息之后,还会对其进行第一次校验,即和本地fork_db中存在的区块的状态进行比较:

signed_block_ptr controller::fetch_block_by_id( block_id_type id )const {
   dlog(" fetch_block_by_id ${id} ", ("id", id));
   auto state = my->fork_db.get_block(id);
   if( state ) return state->block;
   auto bptr = fetch_block_by_number( block_header::num_from_id(id) );
   if( bptr && bptr->id() == id ) return bptr;
   return signed_block_ptr();
}

如果根据区块id或者block_num在本地的fork_db中查询了id或者block_num相同的区块信息,则返回该区块信息,如果本地fork_db中不存在,则返回一个区块状态的指针,用于存储接下来将要进行处理的区块信息,接下来的过程中可能还会存在多次fork_db校验的过程,我们将不再赘述。我们继续来看chain_plugin中对其进行了什么操作:

   void chain_plugin::accept_block(const signed_block_ptr& block ) {
   my->incoming_block_sync_method(block);
}

然后使用类似于信号槽的方式将区块信息传送至producer_plugin:

incoming::methods::block_sync::method_type&        incoming_block_sync_method;

在eos中使用了各种方式的回调、信号槽等,我们需要注意,继续来看这个block_sync信号走到了哪里:

void on_incoming_block(const signed_block_ptr& block) {
         dlog("producer plugin on_incoming_block,blk_id:${blk_id}", ("blk_id", block->id()));
         fc_dlog(_log, "received incoming block ${id}", ("id", block->id()));

         EOS_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it" );


         chain::controller& chain = app().get_plugin<chain_plugin>().chain();

         /* de-dupe here... no point in aborting block if we already know the block */
         auto id = block->id();
         auto existing = chain.fetch_block_by_id( id );
         if( existing ) { return; }

         // abort the pending block
         chain.abort_block();

         // exceptions throw out, make sure we restart our loop
         auto ensure = fc::make_scoped_exit([this](){
            schedule_production_loop();
         });

         // push the new block
         bool except = false;
         try {
            chain.push_block(block);
         }
     //该函数底下来还进行了其他操作         
    }

我们可以看到在producer中处理的过程和我们出块的过程有一定的相似之处,但又不完全相同,其中的区别值得单独拿出来说,作为后话把。

先apply_block()然后commit_block()对区块信息进行确认。在commit_block()中对区块信息确认之后,使用信号槽的形式将区块信息广播至别的插件,是的又是我们熟悉的信号槽,有不太了解的同学也可以参考以前的文章。

eos源码赏析(八):EOS智能合约入门之区块生产

eos源码赏析(九):EOS智能合约入门之区块打包和广播机制

void commit_block( bool add_to_fork_db ) {
      auto reset_pending_on_exit = fc::make_scoped_exit([this]{
         pending.reset();
      });

      try {
         if (add_to_fork_db) {
            pending->_pending_block_state->validated = true;
            auto new_bsp = fork_db.add(pending->_pending_block_state);
             dlog("controller commit_block emit accepted_block_header");
            emit(self.accepted_block_header, pending->_pending_block_state);
            head = fork_db.head();
            EOS_ASSERT(new_bsp == head, fork_database_exception, "committed block did not become the new head in fork database");
         }

         if( !replaying ) {
            reversible_blocks.create<reversible_block_object>( [&]( auto& ubo ) {
               ubo.blocknum = pending->_pending_block_state->block_num;
               ubo.set_block( pending->_pending_block_state->block );
            });
         }
          dlog("controller commit_block emit accepted_block");
         emit( self.accepted_block, pending->_pending_block_state );
      } catch (...) {
         // dont bother resetting pending, instead abort the block
         reset_pending_on_exit.cancel();
         abort_block();
         throw;
      }

      // push the state for pending.
      pending->push();
   }

在commit_block中分别对区块状态头及区块状态进行了信号的发射,对信号槽有一定了解的朋友应该知道:信号槽机制中一个信号是可以对应多个槽函数的,因此我们不仅在mongo_db_plugin中收到了区块的信息,在别的已经绑定了accept_block这个信号的地方也会收到相应的信息,如net_plugin中就有收到:

   void net_plugin_impl::accepted_block(const block_state_ptr& block) {
       dlog("net_plugin_impl::accepted_block,blk_id:${blk_id}", ("blk_id", block->id));
      fc_dlog(logger,"signaled, id = ${id}",("id", block->id));
      dispatcher->bcast_block(*block->block);
   }
2、数据写入MongoDB

在mongo_db_plugin接收到区块信息之后,会将区块写入一个队列中,然后以异步的方式去轮询读取队列中的区块信息并将其写入数据库中,先来看接收区块的槽函数:

void mongo_db_plugin_impl::accepted_block( const chain::block_state_ptr& bs ) {
   try {
       dlog("mongo accepted_block id: ${id},block_num:${num}", ("id", bs->id)("num", bs->block_num));
      if( !start_block_reached ) {
         if( bs->block_num >= start_block_num ) {
            start_block_reached = true;
         }
      }
      if( store_blocks || store_block_states ) {
             //放到一个队列里面
         queue( block_state_queue, bs );
      }
   } catch (fc::exception& e) {
      elog("FC Exception while accepted_block ${e}", ("e", e.to_string()));
   } catch (std::exception& e) {
      elog("STD Exception while accepted_block ${e}", ("e", e.what()));
   } catch (...) {
      elog("Unknown exception while accepted_block");
   }
}

然后去消费这些存在于队列中的区块信息,在这里需要说明的是MongoDB中存有哪些数据呢?我们可以用以下的指令来查看:

> show dbs
admin    0.000GB
config   0.000GB
eosmain  0.000GB
local    0.000GB
> use eosmain
switched to db eosmain
> show collections
account_controls
accounts
action_traces
block_states
blocks
pub_keys
transaction_traces
transactions
> 

这是我在同步主网数据之前,可以看到eosmain所占存储空间为0,当数据开始同步之后会随着数据的增加所占磁盘空间缓慢增加,如不太习惯使用这种方式查询,也可以使用图形化界面的Navicat等进行查询。整个数据库中有八张表,本文使用block_states演示,我们继续来看block是如何存储的:

 void mongo_db_plugin_impl::consume_blocks() {
   try {
      auto mongo_client = mongo_pool->acquire();
      auto& mongo_conn = *mongo_client;

      _accounts = mongo_conn[db_name][accounts_col];
      _trans = mongo_conn[db_name][trans_col];
      _trans_traces = mongo_conn[db_name][trans_traces_col];
      _action_traces = mongo_conn[db_name][action_traces_col];
      _blocks = mongo_conn[db_name][blocks_col];
      _block_states = mongo_conn[db_name][block_states_col];
      _pub_keys = mongo_conn[db_name][pub_keys_col];
      _account_controls = mongo_conn[db_name][account_controls_col];

      while (true) {
         boost::mutex::scoped_lock lock(mtx);
         while ( transaction_metadata_queue.empty() &&
                 transaction_trace_queue.empty() &&
                 block_state_queue.empty() &&
                 irreversible_block_state_queue.empty() &&
                 !done ) {
            condition.wait(lock);
         }
         }
      size_t block_state_size = block_state_queue.size();
//         if (block_state_size > 0) {
//            block_state_process_queue = move(block_state_queue);
//            block_state_queue.clear();
//         }

从队列里面取出block_state之后,使用相应的处理函数进行处理并落库,这里使用了MongoDB的c++开发驱动:

void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr& bs ) {
   using namespace bsoncxx::types;
   using namespace bsoncxx::builder;
   using bsoncxx::builder::basic::kvp;
   using bsoncxx::builder::basic::make_document;

   mongocxx::options::update update_opts{};
   update_opts.upsert( true );

   auto block_num = bs->block_num;
   if( block_num % 1000 == 0 )
      ilog( "block_num: ${b}", ("b", block_num) );
   const auto& block_id = bs->id;
   const auto block_id_str = block_id.str();

   auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
         std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});

   if( store_block_states ) {
      auto block_state_doc = bsoncxx::builder::basic::document{};
      block_state_doc.append( kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
                              kvp( "block_id", block_id_str ),
                              kvp( "validated", b_bool{bs->validated} ) );

      const chain::block_header_state& bhs = *bs;

      auto json = fc::json::to_string( bhs );
      try {
         const auto& value = bsoncxx::from_json( json );
         block_state_doc.append( kvp( "block_header_state", value ) );
      } catch( bsoncxx::exception& ) {
         try {
            json = fc::prune_invalid_utf8( json );
            const auto& value = bsoncxx::from_json( json );
            block_state_doc.append( kvp( "block_header_state", value ) );
            block_state_doc.append( kvp( "non-utf8-purged", b_bool{true} ) );
         } catch( bsoncxx::exception& e ) {
            elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()) );
            elog( "  JSON: ${j}", ("j", json) );
         }
      }
      block_state_doc.append( kvp( "createdAt", b_date{now} ) );

      try {
         if( !_block_states.update_one( make_document( kvp( "block_id", block_id_str ) ),
                                        make_document( kvp( "$set", block_state_doc.view() ) ), update_opts ) ) {
            EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id) );
         }
      } catch( ... ) {
         handle_mongo_exception( "block_states insert: " + json, __LINE__ );
      }
   }

至此,就完成了一个数据从主网同步到数据落库的一个过程。

本文主要基于net_plugin和mongo_db_plugin介绍了主网数据同步及数据落库的一个过程。有一些建议:

1,多加log打印跟踪。

2,善用图形化界面工具操作数据库

当然还有很多不完善的地方,欢迎补充。