Alluxio源码分析:RPC框架浅析(二)
Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。
继《Alluxio源码分析:RPC框架浅析(一)》一文后,本文继续讲解Alluxio中RPC实现。
3、Server端实现:RPC Server端口绑定、传输协议等参数设置、Server启动
AlluxioMaster是Alluxio中Master的实现,那么RPC服务端server自然就会落在它身上了。我们先看AlluxioMaster进程的启动main()方法,如下:
/** * Starts the Alluxio master server via {@code java -cp ALLUXIO-VERSION alluxio.Master}. * @param args there are no arguments used public static void main(String[] args) { // 启动master时参数应为空 if (args.length != 0) { LOG.info("java -cp {} alluxio.Master", Version.ALLUXIO_JAR); System.exit(-1); // validate the conf // 验证配置信息 if (!ValidateConf.validate()) { LOG.error("Invalid configuration found"); System.exit(-1); try { // 调用get()方法,返回AlluxioMaster实例master AlluxioMaster master = get(); // 调用实例master的start()方法,启动AlluxioMaster实例master master.start(); } catch (Exception e) { LOG.error("Uncaught exception terminating Master", e); System.exit(-1); }它主要干了两件事,一个就是调用get()方法,返回AlluxioMaster实例master,另一个就是调用实例master的start()方法,启动AlluxioMaster实例master。我们先看下get()方法,如下:
/** * Returns a handle to the Alluxio master instance. * @return Alluxio master handle public static synchronized AlluxioMaster get() { // 静态AlluxioMaster类型成员变量sAlluxioMaster为空时,通过Factory.create()构造一个,否则返回sAlluxioMaster if (sAlluxioMaster == null) { sAlluxioMaster = Factory.create(); return sAlluxioMaster; }而Factory的create()方法,则会根据参数alluxio.zookeeper.enabled确定返回FaultTolerantAlluxioMaster实例还是AlluxioMaster实例,FaultTolerantAlluxioMaster继承自AlluxioMaster,默认是返回AlluxioMaster实例,代码如下:
/** * @return {@link FaultTolerantAlluxioMaster} if Alluxio configuration is set to use zookeeper, * otherwise, return {@link AlluxioMaster}. public static AlluxioMaster create() { // 根据参数alluxio.zookeeper.enabled确定返回FaultTolerantAlluxioMaster实例还是AlluxioMaster实例, // FaultTolerantAlluxioMaster继承自AlluxioMaster,默认是返回AlluxioMaster实例 if (MasterContext.getConf().getBoolean(Constants.ZOOKEEPER_ENABLED)) { return new FaultTolerantAlluxioMaster(); return new AlluxioMaster(); }在AlluxioMasterd的构造方法中,涉及RPC相关的,主要是Worker最大和最小线程数、服务端Socker的TServerSocket实例mTServerSocket等的构造,关键代码如下:
Configuration conf = MasterContext.getConf(); // Worker最大和最小线程数:分别取参数alluxio.master.worker.threads.max和alluxio.master.worker.threads.min mMinWorkerThreads = conf.getInt(Constants.MASTER_WORKER_THREADS_MIN); mMaxWorkerThreads = conf.getInt(Constants.MASTER_WORKER_THREADS_MAX); // 获取传输提供者mTransportProvider mTransportProvider = TransportProvider.Factory.create(conf); // 构造TServerSocket实例mTServerSocket // port取参数alluxio.master.port=19998 mTServerSocket = new TServerSocket(NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC, conf));
再看实例master的start()方法,也就是AlluxioMaster的start()方法,代码如下:
/** * Starts the Alluxio master server. * @throws Exception if starting the master fails public void start() throws Exception { startMasters(true); // 启动服务 startServing(); }继续看startServing()方法,如下:
protected void startServing(String startMessage, String stopMessage) { mMasterMetricsSystem.start(); // 启动web服务 startServingWebServer(); LOG.info("Alluxio Master version {} started @ {} {}", Version.VERSION, mMasterAddress, startMessage); // 启动RPC服务 startServingRPCServer(); LOG.info("Alluxio Master version {} ended @ {} {}", Version.VERSION, mMasterAddress, stopMessage); }撇开启动web服务不说,我们看下启动RPC服务的startServingRPCServer()方法,如下:
protected void startServingRPCServer() { // set up multiplexed thrift processors // 构造多路复用处理器TMultiplexedProcessor实例processor TMultiplexedProcessor processor = new TMultiplexedProcessor(); // 注册BlockMaster服务 registerServices(processor, mBlockMaster.getServices()); // 注册FileSystemMaster服务 registerServices(processor, mFileSystemMaster.getServices()); // 必要的话,注册LineageMaster服务 if (LineageUtils.isLineageEnabled(MasterContext.getConf())) { registerServices(processor, mLineageMaster.getServices()); // register additional masters for RPC service // 注册额外的Masters服务 for (Master master : mAdditionalMasters) { registerServices(processor, master.getServices()); // Return a TTransportFactory based on the authentication type TTransportFactory transportFactory; try { // 获得传输工厂实例 transportFactory = mTransportProvider.getServerTransportFactory(); } catch (IOException e) { throw Throwables.propagate(e); // create master thrift service with the multiplexed processor. // 构造TThreadPoolServer实例时需要的参数: // 服务端Socker:TServerSocket类型实例mTServerSocket // 最大Worker线程数mMaxWorkerThreads // 最小Worker线程数mMinWorkerThreads // 处理器processor // 传输工厂transportFactory // 协议工厂TBinaryProtocol:二进制协议TBinaryProtocol Args args = new TThreadPoolServer.Args(mTServerSocket).maxWorkerThreads(mMaxWorkerThreads) .minWorkerThreads(mMinWorkerThreads).processor(processor).transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory(true, true)); if (MasterContext.getConf().getBoolean(Constants.IN_TEST_MODE)) { args.stopTimeoutVal = 0; } else { args.stopTimeoutVal = Constants.THRIFT_STOP_TIMEOUT_SECONDS; // 构造TThreadPoolServer实例mMasterServiceServer mMasterServiceServer = new TThreadPoolServer(args); // start thrift rpc server // 标志位正在提供服务mIsServing设置为true mIsServing = true; // 启动时间mStartTimeMs取当前时间 mStartTimeMs = System.currentTimeMillis(); // 启动TThreadPoolServer服务 mMasterServiceServer.serve(); }startServingRPCServer()方法主要处理流程如下:
1、构造多路复用处理器TMultiplexedProcessor实例processor;
2、调用registerServices()方法,注册BlockMaster、FileSystemMaster、LineageMaster、额外的Masters等各种服务;
3、调用TransportProvider的getServerTransportFactory()方法获得传输工厂实例transportFactory;
4、构造TThreadPoolServer实例时需要的参数:包括:
(1)服务端Socker:TServerSocket类型实例mTServerSocket;
(2)最大Worker线程数mMaxWorkerThreads;
(3)最小Worker线程数mMinWorkerThreads;
(4)多路复用处理器processor;
(5)传输工厂transportFactory;
(6)协议工厂TBinaryProtocol:二进制协议TBinaryProtocol;
5、构造TThreadPoolServer实例mMasterServiceServer;
6、标志位正在提供服务mIsServing设置为true;
7、启动时间mStartTimeMs取当前时间;
8、调用mMasterServiceServer的serve()方法启动TThreadPoolServer服务。
先以FileSystemMaster服务为例,看下RPC服务是如何注册的,代码如下:
private void registerServices(TMultiplexedProcessor processor, Map String, TProcessor services) { for (Map.Entry String, TProcessor service : services.entrySet()) { processor.registerProcessor(service.getKey(), service.getValue()); }
注册很简单,多路复用处理器processor的registerProcessor()方法即可完成注册,关键是要看注册的是什么东西它的服务是通过FileSystemMaster的getServices()方法获取的,我们跟踪下:
@Override public Map String, TProcessor getServices() { Map String, TProcessor services = new HashMap String, TProcessor // FileSystemMasterClientService服务 services.put( // key为"FileSystemMasterClient" Constants.FILE_SYSTEM_MASTER_CLIENT_SERVICE_NAME, // 可以看出,FileSystemMasterClientService服务Master端实现者是FileSystemMasterClientServiceHandler new FileSystemMasterClientService.Processor FileSystemMasterClientServiceHandler ( new FileSystemMasterClientServiceHandler(this))); // FileSystemMasterWorkerService服务 services.put( // key为"FileSystemMasterWorker" Constants.FILE_SYSTEM_MASTER_WORKER_SERVICE_NAME, // 可以看出,FileSystemMasterWorkerService服务Master端实现者是FileSystemMasterWorkerServiceHandler new FileSystemMasterWorkerService.Processor FileSystemMasterWorkerServiceHandler ( new FileSystemMasterWorkerServiceHandler(this))); return services; }就俩服务:FileSystemMasterClientService和FileSystemMasterWorkerService,它们在Master端的实现者分别是FileSystemMasterClientServiceHandler和FileSystemMasterWorkerServiceHandler,并且是通过各自Service的Processor来构造的,看到这里,你似乎应该明白什么了吧!这就是Processor的用途。
再看下获得传输工厂实例transportFactory,它是通过TransportProvider实例mTransportProvider的getServerTransportFactory()方法来获取的,而mTransportProvider的初始化也是在AlluxioMaster构造方法中,通过TransportProvider.Factory.create(conf)来获取的,我们看下代码:
public static TransportProvider create(Configuration conf) { AuthType authType = conf.getEnum(Constants.SECURITY_AUTHENTICATION_TYPE, AuthType.class); switch (authType) { case NOSASL: return new NoSaslTransportProvider(conf); case SIMPLE: // intended to fall through case CUSTOM: return new PlainSaslTransportProvider(conf); case KERBEROS: throw new UnsupportedOperationException( "getClientTransport: Kerberos is not supported currently."); default: throw new UnsupportedOperationException( "getClientTransport: Unsupported authentication type: " + authType.getAuthName()); }它目前仅支持NOSASL和CUSTOM两种类型,分别对应NoSaslTransportProvider和PlainSaslTransportProvider两个类。我们以CUSTOM类型的PlainSaslTransportProvider为例来看下getServerTransportFactory()方法,代码如下:
@Override public TTransportFactory getServerTransportFactory() throws SaslException { AuthType authType = mConfiguration.getEnum(Constants.SECURITY_AUTHENTICATION_TYPE, AuthType.class); TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); AuthenticationProvider provider = AuthenticationProvider.Factory.create(authType, mConfiguration); saslFactory .addServerDefinition(PlainSaslServerProvider.MECHANISM, null, null, new HashMap String, String (), new PlainSaslServerCallbackHandler(provider)); return saslFactory; }剩余的TThreadPoolServer实例构造、参数选择等上面解释的已经很清晰,读者可自行分析。
未完待续,请关注《Alluxio源码分析:RPC框架浅析(三)》!
RPC框架:从原理到选型,一文带你搞懂RPC(二) RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
RPC框架:从原理到选型,一文带你搞懂RPC(三) RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
RPC框架:从原理到选型,一文带你搞懂RPC(一) RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
你有必要了解一下Flink底层RPC使用的框架和原理 5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗? 欢迎您关注《大数据成神之路》 对于Flink中各个组件(JobMaster、TaskManager、Dispatcher等),其底层RPC框架基于Akka实现,本文着重分析Flink中的Rpc框架实现机制及梳理其通信流程。
高并发架构系列:如何从0到1设计一个类Dubbo的RPC框架 在过去持续分享的几十期阿里Java面试题中,几乎每次都会问到Dubbo相关问题,比如:“如何从0到1设计一个Dubbo的RPC框架”,这个问题主要考察以下几个方面: 你对RPC框架的底层原理掌握程度。 考验你的整体RPC框架系统设计能力。本文详解~
相关文章
- 数据源管理 | 基于DataX组件,同步数据和源码分析
- Sql Server 删除数据表的存储过程,直接能用!(源码带说明)
- 源码解析--图数据hugegraph的后端存储底层数据结构
- UE4 和 H5 互相通信、Github 上访问 Unreal Engine 的源码
- 恒玄科技BES250解决方案之双耳链接调试总结和源码分析
- Java小白进阶系列——Java锁框架AQS源码分析目录大纲
- Java集合框架之二:LinkedList源码解析
- 苹果最新框架教程之 ScreenCaptureKit使用含项目源码
- iOS Swift 人工智能相机完成项目之宠物自动拍摄APP基于机器视觉(项目含源码)
- SwiftUI 随机数使用大全之随机生成字符串、数字、小数(教程含源码)
- macOS SwiftUI 教程之 06 绘图实现正弦波,通过调整频率和步进值来更新正弦波视图 (教程含源码)
- macOS SwiftUI 经典框架之上下瀑布式工具栏框架(教程含源码)
- Python 制作软硬件交互游戏设备之太空探索者游戏(教程含源码)
- Aspects框架的源码解读及问题解析
- SwiftUI Widget 布局之垂直等高文字标签 (教程含源码)
- 用c#开发微信 (4) 基于Senparc.Weixin框架的接收事件推送处理 (源码下载)
- arcgis api 3.x for js 热力图优化篇-不依赖地图服务(附源码下载)
- ASP.NET MVC+EF框架+EasyUI实现权限管理系列(24)-权限组的设计和实现(附源码)(终结)
- Linux系统源码安装软件过程中configure选项-prefix的作用
- hadoop2.7之Mapper/reducer源码分析