Storm-源码分析- Thrift的使用
首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service
然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码
比如对于nimbus service
在IDL的定义为,
service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void killTopology(1: string name) throws (1: NotAliveException e); void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e); void activate(1: string name) throws (1: NotAliveException e); void deactivate(1: string name) throws (1: NotAliveException e); void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite); // need to add functions for asking about status of storms, what nodes theyre running on, looking at task logs string beginFileUpload(); void uploadChunk(1: string location, 2: binary chunk); void finishFileUpload(1: string location); string beginFileDownload(1: string file); //can stop downloading chunks when receive 0-length byte array back binary downloadChunk(1: string id); // returns json string getNimbusConf(); // stats functions ClusterSummary getClusterInfo(); TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e); //returns json string getTopologyConf(1: string id) throws (1: NotAliveException e); StormTopology getTopology(1: string id) throws (1: NotAliveException e); StormTopology getUserTopology(1: string id) throws (1: NotAliveException e); }
而对应在Nimbus.java的Java代码如下,
public class Nimbus { public interface Iface { public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException; public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException; public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException; public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException; public void activate(String name) throws NotAliveException, org.apache.thrift7.TException; public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException; public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException; public String beginFileUpload() throws org.apache.thrift7.TException; public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException; public void finishFileUpload(String location) throws org.apache.thrift7.TException; public String beginFileDownload(String file) throws org.apache.thrift7.TException; public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException; public String getNimbusConf() throws org.apache.thrift7.TException; public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException; public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException; public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException; public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException; public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException; }
2 Client
1. 首先Get Client,
NimbusClient client = NimbusClient.getConfiguredClient(conf);
看看backtype.storm.utils下面的client.getConfiguredClient的逻辑,
只是从配置中取出nimbus的host:port, 并new NimbusClient
public static NimbusClient getConfiguredClient(Map conf) { try { String nimbusHost = (String) conf.get(Config.NIMBUS_HOST); int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)); return new NimbusClient(conf, nimbusHost, nimbusPort); } catch (TTransportException ex) { throw new RuntimeException(ex); }
NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient
ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote
这里看出Thrift对Transport和Protocol的封装
对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port)
然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话
public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException { try { //locate login configuration Configuration login_conf = AuthUtils.GetConfiguration(storm_conf); //construct a transport plugin ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf); //create a socket with server if(host==null) { throw new IllegalArgumentException("host is not set"); if(port =0) { throw new IllegalArgumentException("invalid port: "+port); TSocket socket = new TSocket(host, port); if(timeout!=null) { socket.setTimeout(timeout); final TTransport underlyingTransport = socket; //establish client-server transport via plugin _transport = transportPlugin.connect(underlyingTransport, host); } catch (IOException ex) { throw new RuntimeException(ex); _protocol = null; if (_transport != null) _protocol = new TBinaryProtocol(_transport); }
2. 调用任意RPC
那么就看看submitTopologyWithOpts
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现
public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options); recv_submitTopologyWithOpts(); }分两步,
首先send_submitTopologyWithOpts, 调用sendBase
接着, recv_submitTopologyWithOpts, 调用receiveBase
protected void sendBase(String methodName, TBase args) throws TException { oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_)); args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); protected void receiveBase(TBase result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin(); if (msg.type == TMessageType.EXCEPTION) { TApplicationException x = TApplicationException.read(iprot_); iprot_.readMessageEnd(); throw x; if (msg.seqid != seqid_) { throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response"); result.read(iprot_); iprot_.readMessageEnd(); }可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定
3 Server
Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现
下面看看在nimbus server端, 是用clojure来写的
可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer, TBinaryProtocol, Proccessor, 非常简单
其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.
(defn launch-server! [conf nimbus] (validate-distributed-mode! conf) (let [service-handler (service-handler conf nimbus) options (- (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) (.processor (Nimbus$Processor. service-handler)) server (THsHaServer. options)] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server)))) (log-message "Starting Nimbus server...") (.serve server)))
本文章摘自博客园,原文发布日期:2013-06-04
HBase thrift/thrift2 使用指南 Thrift server简介 Thrift server是HBase中的一种服务,主要用于对多语言API的支持。基于Apache Thrift(多语言支持的通信框架)开发,目前有两种版本thrift和thrift2。
相关文章
- C# DateTime的11种构造函数 [Abp 源码分析]十五、自动审计记录 .Net 登陆的时候添加验证码 使用Topshelf开发Windows服务、记录日志 日常杂记——C#验证码 c#_生成图片式验证码 C# 利用SharpZipLib生成压缩包 Sql2012如何将远程服务器数据库及表、表结构、表数据导入本地数据库
- 深入理解Spark:核心思想与源码分析. 导读
- Storm-源码分析- timer (backtype.storm.timer)
- Storm-源码分析-Stats (backtype.storm.stats)
- Storm-源码分析- metric
- Storm-源码分析- hook (backtype.storm.hooks)
- Storm-源码分析-Topology Submit-Task
- Storm-源码分析- bolt (backtype.storm.task)
- Storm-源码分析-Topology Submit-Executor
- Spark源码分析 – DAGScheduler
- Spark源码分析 – BlockManager
- Spark MLlib - Decision Tree源码分析
- ArrayList相关方法介绍及源码分析
- JS axios cancelToken 是如何实现取消请求?稍有啰嗦但超有耐心的 axios 源码分析
- PHP全开源京东淘宝唯品会自动抢单系统源码
- Spring源码分析(七)扩展接口BeanPostProcessors源码分析
- 《MapReduce 2.0源码分析与编程实战》一1.6 本章小结
- 《MapReduce 2.0源码分析与编程实战》一导读
- AQS源码探究_02 AQS简介及属性分析
- SwiftUI 3 新属性构建完整Todo List App待办事项应用程序 (教程含源码)
- 如何在 reactJS 中制作天气网络应用程序(教程含源码)
- 12.1 ROS NavFn全局规划源码_1
- Spring 源码分析之 bean 依赖注入原理(注入属性)
- JDK源码选读
- lucene源码分析(1)基本要素
- cglib源码分析--转
- Java实现Opencv源码以及启动appium的两种方式