zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Storm-源码分析- Thrift的使用

源码 分析 storm thrift 使用
2023-09-11 14:16:09 时间

首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service 
然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

比如对于nimbus service 
在IDL的定义为,

service Nimbus {

 void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);

 void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);

 void killTopology(1: string name) throws (1: NotAliveException e);

 void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);

 void activate(1: string name) throws (1: NotAliveException e);

 void deactivate(1: string name) throws (1: NotAliveException e);

 void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

 // need to add functions for asking about status of storms, what nodes theyre running on, looking at task logs

 string beginFileUpload();

 void uploadChunk(1: string location, 2: binary chunk);

 void finishFileUpload(1: string location);

 string beginFileDownload(1: string file);

 //can stop downloading chunks when receive 0-length byte array back

 binary downloadChunk(1: string id);

 // returns json

 string getNimbusConf();

 // stats functions

 ClusterSummary getClusterInfo();

 TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);

 //returns json

 string getTopologyConf(1: string id) throws (1: NotAliveException e);

 StormTopology getTopology(1: string id) throws (1: NotAliveException e);

 StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);

}

而对应在Nimbus.java的Java代码如下,

public class Nimbus {

 public interface Iface {

 public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

 public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

 public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

 public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;

 public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

 public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

 public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;

 public String beginFileUpload() throws org.apache.thrift7.TException;

 public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;

 public void finishFileUpload(String location) throws org.apache.thrift7.TException;

 public String beginFileDownload(String file) throws org.apache.thrift7.TException;

 public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

 public String getNimbusConf() throws org.apache.thrift7.TException;

 public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;

 public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

 public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

 public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

 public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

 }

 

2 Client

1. 首先Get Client,

NimbusClient client = NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的逻辑, 
只是从配置中取出nimbus的host:port, 并new NimbusClient

 public static NimbusClient getConfiguredClient(Map conf) {

 try {

 String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

 int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

 return new NimbusClient(conf, nimbusHost, nimbusPort);

 } catch (TTransportException ex) {

 throw new RuntimeException(ex);

 }

NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote 
这里看出Thrift对Transport和Protocol的封装 
对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port) 
然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

 public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

 try {

 //locate login configuration 

 Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

 //construct a transport plugin

 ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);

 //create a socket with server

 if(host==null) {

 throw new IllegalArgumentException("host is not set");

 if(port =0) {

 throw new IllegalArgumentException("invalid port: "+port);

 TSocket socket = new TSocket(host, port);

 if(timeout!=null) {

 socket.setTimeout(timeout);

 final TTransport underlyingTransport = socket;

 //establish client-server transport via plugin

 _transport = transportPlugin.connect(underlyingTransport, host); 

 } catch (IOException ex) {

 throw new RuntimeException(ex);

 _protocol = null;

 if (_transport != null)

 _protocol = new TBinaryProtocol(_transport);

 }

 

2. 调用任意RPC 
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 

可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

 public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException

 send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);

 recv_submitTopologyWithOpts();

 }
分两步, 
首先send_submitTopologyWithOpts, 调用sendBase 
接着, recv_submitTopologyWithOpts, 调用receiveBase 
 protected void sendBase(String methodName, TBase args) throws TException {

 oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));

 args.write(oprot_);

 oprot_.writeMessageEnd();

 oprot_.getTransport().flush();

 protected void receiveBase(TBase result, String methodName) throws TException {

 TMessage msg = iprot_.readMessageBegin();

 if (msg.type == TMessageType.EXCEPTION) {

 TApplicationException x = TApplicationException.read(iprot_);

 iprot_.readMessageEnd();

 throw x;

 if (msg.seqid != seqid_) {

 throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");

 result.read(iprot_);

 iprot_.readMessageEnd();

 }
可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定 

 

3 Server

Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现 
下面看看在nimbus server端, 是用clojure来写的 
可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer, TBinaryProtocol, Proccessor, 非常简单 
其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

(defn launch-server! [conf nimbus]

 (validate-distributed-mode! conf)

 (let [service-handler (service-handler conf nimbus)

 options (- (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))

 (THsHaServer$Args.)

 (.workerThreads 64)

 (.protocolFactory (TBinaryProtocol$Factory.))

 (.processor (Nimbus$Processor. service-handler))

 server (THsHaServer. options)]

 (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))

 (log-message "Starting Nimbus server...")

 (.serve server)))


本文章摘自博客园,原文发布日期:2013-06-04

HBase thrift/thrift2 使用指南 Thrift server简介 Thrift server是HBase中的一种服务,主要用于对多语言API的支持。基于Apache Thrift(多语言支持的通信框架)开发,目前有两种版本thrift和thrift2。