zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Alluxio源码分析:RPC框架浅析(二)

源码框架 分析 浅析 RPC ALLuxio
2023-09-27 14:29:33 时间

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。

        继《Alluxio源码分析:RPC框架浅析(一)》一文后,本文继续讲解Alluxio中RPC实现。

        3、Server端实现:RPC Server端口绑定、传输协议等参数设置、Server启动

        AlluxioMaster是Alluxio中Master的实现,那么RPC服务端server自然就会落在它身上了。我们先看AlluxioMaster进程的启动main()方法,如下:

 /**

 * Starts the Alluxio master server via {@code java -cp ALLUXIO-VERSION alluxio.Master}.

 * @param args there are no arguments used

 public static void main(String[] args) {

 // 启动master时参数应为空

 if (args.length != 0) {

 LOG.info("java -cp {} alluxio.Master", Version.ALLUXIO_JAR);

 System.exit(-1);

 // validate the conf

 // 验证配置信息

 if (!ValidateConf.validate()) {

 LOG.error("Invalid configuration found");

 System.exit(-1);

 try {

 // 调用get()方法,返回AlluxioMaster实例master

 AlluxioMaster master = get();

 // 调用实例master的start()方法,启动AlluxioMaster实例master

 master.start();

 } catch (Exception e) {

 LOG.error("Uncaught exception terminating Master", e);

 System.exit(-1);

 }
        它主要干了两件事,一个就是调用get()方法,返回AlluxioMaster实例master,另一个就是调用实例master的start()方法,启动AlluxioMaster实例master。我们先看下get()方法,如下:

 /**

 * Returns a handle to the Alluxio master instance.

 * @return Alluxio master handle

 public static synchronized AlluxioMaster get() {

 // 静态AlluxioMaster类型成员变量sAlluxioMaster为空时,通过Factory.create()构造一个,否则返回sAlluxioMaster

 if (sAlluxioMaster == null) {

 sAlluxioMaster = Factory.create();

 return sAlluxioMaster;

 }
        而Factory的create()方法,则会根据参数alluxio.zookeeper.enabled确定返回FaultTolerantAlluxioMaster实例还是AlluxioMaster实例,FaultTolerantAlluxioMaster继承自AlluxioMaster,默认是返回AlluxioMaster实例,代码如下:

 /**

 * @return {@link FaultTolerantAlluxioMaster} if Alluxio configuration is set to use zookeeper,

 * otherwise, return {@link AlluxioMaster}.

 public static AlluxioMaster create() {

 // 根据参数alluxio.zookeeper.enabled确定返回FaultTolerantAlluxioMaster实例还是AlluxioMaster实例,

 // FaultTolerantAlluxioMaster继承自AlluxioMaster,默认是返回AlluxioMaster实例

 if (MasterContext.getConf().getBoolean(Constants.ZOOKEEPER_ENABLED)) {

 return new FaultTolerantAlluxioMaster();

 return new AlluxioMaster();

 }
        在AlluxioMasterd的构造方法中,涉及RPC相关的,主要是Worker最大和最小线程数、服务端Socker的TServerSocket实例mTServerSocket等的构造,关键代码如下:

 Configuration conf = MasterContext.getConf();

 // Worker最大和最小线程数:分别取参数alluxio.master.worker.threads.max和alluxio.master.worker.threads.min

 mMinWorkerThreads = conf.getInt(Constants.MASTER_WORKER_THREADS_MIN);

 mMaxWorkerThreads = conf.getInt(Constants.MASTER_WORKER_THREADS_MAX);

 // 获取传输提供者mTransportProvider

 mTransportProvider = TransportProvider.Factory.create(conf);

 // 构造TServerSocket实例mTServerSocket

 // port取参数alluxio.master.port=19998

 mTServerSocket =

 new TServerSocket(NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC, conf));

        再看实例master的start()方法,也就是AlluxioMaster的start()方法,代码如下:

 /**

 * Starts the Alluxio master server.

 * @throws Exception if starting the master fails

 public void start() throws Exception {

 startMasters(true);

 // 启动服务

 startServing();

 }
        继续看startServing()方法,如下:

 protected void startServing(String startMessage, String stopMessage) {

 mMasterMetricsSystem.start();

 // 启动web服务

 startServingWebServer();

 LOG.info("Alluxio Master version {} started @ {} {}", Version.VERSION, mMasterAddress,

 startMessage);

 // 启动RPC服务

 startServingRPCServer();

 LOG.info("Alluxio Master version {} ended @ {} {}", Version.VERSION, mMasterAddress,

 stopMessage);

 }
        撇开启动web服务不说,我们看下启动RPC服务的startServingRPCServer()方法,如下:

 protected void startServingRPCServer() {

 // set up multiplexed thrift processors

 // 构造多路复用处理器TMultiplexedProcessor实例processor

 TMultiplexedProcessor processor = new TMultiplexedProcessor();

 // 注册BlockMaster服务

 registerServices(processor, mBlockMaster.getServices());

 // 注册FileSystemMaster服务

 registerServices(processor, mFileSystemMaster.getServices());

 // 必要的话,注册LineageMaster服务

 if (LineageUtils.isLineageEnabled(MasterContext.getConf())) {

 registerServices(processor, mLineageMaster.getServices());

 // register additional masters for RPC service

 // 注册额外的Masters服务

 for (Master master : mAdditionalMasters) {

 registerServices(processor, master.getServices());

 // Return a TTransportFactory based on the authentication type

 TTransportFactory transportFactory;

 try {

 // 获得传输工厂实例

 transportFactory = mTransportProvider.getServerTransportFactory();

 } catch (IOException e) {

 throw Throwables.propagate(e);

 // create master thrift service with the multiplexed processor.

 // 构造TThreadPoolServer实例时需要的参数:

 // 服务端Socker:TServerSocket类型实例mTServerSocket

 // 最大Worker线程数mMaxWorkerThreads

 // 最小Worker线程数mMinWorkerThreads

 // 处理器processor

 // 传输工厂transportFactory

 // 协议工厂TBinaryProtocol:二进制协议TBinaryProtocol

 Args args = new TThreadPoolServer.Args(mTServerSocket).maxWorkerThreads(mMaxWorkerThreads)

 .minWorkerThreads(mMinWorkerThreads).processor(processor).transportFactory(transportFactory)

 .protocolFactory(new TBinaryProtocol.Factory(true, true));

 if (MasterContext.getConf().getBoolean(Constants.IN_TEST_MODE)) {

 args.stopTimeoutVal = 0;

 } else {

 args.stopTimeoutVal = Constants.THRIFT_STOP_TIMEOUT_SECONDS;

 // 构造TThreadPoolServer实例mMasterServiceServer

 mMasterServiceServer = new TThreadPoolServer(args);

 // start thrift rpc server

 // 标志位正在提供服务mIsServing设置为true

 mIsServing = true;

 // 启动时间mStartTimeMs取当前时间

 mStartTimeMs = System.currentTimeMillis();

 // 启动TThreadPoolServer服务

 mMasterServiceServer.serve();

 }
         startServingRPCServer()方法主要处理流程如下:

        1、构造多路复用处理器TMultiplexedProcessor实例processor;

        2、调用registerServices()方法,注册BlockMaster、FileSystemMaster、LineageMaster、额外的Masters等各种服务;

        3、调用TransportProvider的getServerTransportFactory()方法获得传输工厂实例transportFactory;

        4、构造TThreadPoolServer实例时需要的参数:包括:

             (1)服务端Socker:TServerSocket类型实例mTServerSocket;

             (2)最大Worker线程数mMaxWorkerThreads;

             (3)最小Worker线程数mMinWorkerThreads;

             (4)多路复用处理器processor;

             (5)传输工厂transportFactory;

             (6)协议工厂TBinaryProtocol:二进制协议TBinaryProtocol;

        5、构造TThreadPoolServer实例mMasterServiceServer;

        6、标志位正在提供服务mIsServing设置为true;

        7、启动时间mStartTimeMs取当前时间;

        8、调用mMasterServiceServer的serve()方法启动TThreadPoolServer服务。
        先以FileSystemMaster服务为例,看下RPC服务是如何注册的,代码如下:

 private void registerServices(TMultiplexedProcessor processor, Map String, TProcessor services) {

 for (Map.Entry String, TProcessor service : services.entrySet()) {

 processor.registerProcessor(service.getKey(), service.getValue());

 }

        注册很简单,多路复用处理器processor的registerProcessor()方法即可完成注册,关键是要看注册的是什么东西它的服务是通过FileSystemMaster的getServices()方法获取的,我们跟踪下:

 @Override

 public Map String, TProcessor getServices() {

 Map String, TProcessor services = new HashMap String, TProcessor 

 // FileSystemMasterClientService服务

 services.put(

 // key为"FileSystemMasterClient"

 Constants.FILE_SYSTEM_MASTER_CLIENT_SERVICE_NAME,

 // 可以看出,FileSystemMasterClientService服务Master端实现者是FileSystemMasterClientServiceHandler

 new FileSystemMasterClientService.Processor FileSystemMasterClientServiceHandler (

 new FileSystemMasterClientServiceHandler(this)));

 // FileSystemMasterWorkerService服务 

 services.put(

 // key为"FileSystemMasterWorker"

 Constants.FILE_SYSTEM_MASTER_WORKER_SERVICE_NAME,

 // 可以看出,FileSystemMasterWorkerService服务Master端实现者是FileSystemMasterWorkerServiceHandler

 new FileSystemMasterWorkerService.Processor FileSystemMasterWorkerServiceHandler (

 new FileSystemMasterWorkerServiceHandler(this)));

 return services;

 }
        就俩服务:FileSystemMasterClientService和FileSystemMasterWorkerService,它们在Master端的实现者分别是FileSystemMasterClientServiceHandler和FileSystemMasterWorkerServiceHandler,并且是通过各自Service的Processor来构造的,看到这里,你似乎应该明白什么了吧!这就是Processor的用途。
        再看下获得传输工厂实例transportFactory,它是通过TransportProvider实例mTransportProvider的getServerTransportFactory()方法来获取的,而mTransportProvider的初始化也是在AlluxioMaster构造方法中,通过TransportProvider.Factory.create(conf)来获取的,我们看下代码:

 public static TransportProvider create(Configuration conf) {

 AuthType authType = conf.getEnum(Constants.SECURITY_AUTHENTICATION_TYPE, AuthType.class);

 switch (authType) {

 case NOSASL:

 return new NoSaslTransportProvider(conf);

 case SIMPLE: // intended to fall through

 case CUSTOM:

 return new PlainSaslTransportProvider(conf);

 case KERBEROS:

 throw new UnsupportedOperationException(

 "getClientTransport: Kerberos is not supported currently.");

 default:

 throw new UnsupportedOperationException(

 "getClientTransport: Unsupported authentication type: " + authType.getAuthName());

 }
        它目前仅支持NOSASL和CUSTOM两种类型,分别对应NoSaslTransportProvider和PlainSaslTransportProvider两个类。我们以CUSTOM类型的PlainSaslTransportProvider为例来看下getServerTransportFactory()方法,代码如下:

 @Override

 public TTransportFactory getServerTransportFactory() throws SaslException {

 AuthType authType =

 mConfiguration.getEnum(Constants.SECURITY_AUTHENTICATION_TYPE, AuthType.class);

 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();

 AuthenticationProvider provider =

 AuthenticationProvider.Factory.create(authType, mConfiguration);

 saslFactory

 .addServerDefinition(PlainSaslServerProvider.MECHANISM, null, null,

 new HashMap String, String (), new PlainSaslServerCallbackHandler(provider));

 return saslFactory;

 }
        剩余的TThreadPoolServer实例构造、参数选择等上面解释的已经很清晰,读者可自行分析。

        未完待续,请关注《Alluxio源码分析:RPC框架浅析(三)》










RPC框架:从原理到选型,一文带你搞懂RPC(二) RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
RPC框架:从原理到选型,一文带你搞懂RPC(三) RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
RPC框架:从原理到选型,一文带你搞懂RPC(一) RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
你有必要了解一下Flink底层RPC使用的框架和原理 5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗? 欢迎您关注《大数据成神之路》 对于Flink中各个组件(JobMaster、TaskManager、Dispatcher等),其底层RPC框架基于Akka实现,本文着重分析Flink中的Rpc框架实现机制及梳理其通信流程。
高并发架构系列:如何从0到1设计一个类Dubbo的RPC框架 在过去持续分享的几十期阿里Java面试题中,几乎每次都会问到Dubbo相关问题,比如:“如何从0到1设计一个Dubbo的RPC框架”,这个问题主要考察以下几个方面: 你对RPC框架的底层原理掌握程度。 考验你的整体RPC框架系统设计能力。本文详解~