zl程序教程

您现在的位置是:首页 >  其他

当前栏目

使用thrift实现订阅服务和发布服务详解编程语言

服务编程语言 实现 使用 详解 发布 订阅 thrift
2023-06-13 09:11:49 时间
服务:订阅服务 market_subscriber 和 发布服务 market_publisher

功能:market_subscriber 能够向 market_publisher 请求订阅某些类型的消息,当 market_publisher 有该类型的消息时,需要把它推送给订阅服务。

流程:1. market_publisher 服务启动,监听端口,注册事件处理函数,处理订阅请求消息。

         2. market_subscriber 服务启动,监听端口,注册事件处理函数,处理接收推动来的消息。

         3. market_subscriber 向 market_publisher 发起订阅请求,market_publisher 根据订阅请求参数,长连接 market_subscriber 提供的消息接收端口。

         4. market_publisher 通过长连接向 market_subscriber 推送消息。

注意:1. market_publisher 到 market_subscriber 的长连接的维护:

            (1)market_subscriber 一定时间内未收到 market_publisher 的推送消息,尝试重新发起订阅请求。

            (2)market_publisher 推送订阅消息时,发现连接断开,尝试重连。 考虑 market_publisher 有重启的情况,收到的订阅请求参数需要做持久化。 == TODO:增加一个 market_subscriber 到 market_publisher 的取消订阅的请求。

实现:

1. market_subscriber.thrift    market_subscriber 实现的服务接口

namespace cpp market_subscriber

namespace java market_subscriber

namespace perl market_subscriber

namespace php market_subscriber

struct Snapshot

{

 1: i32 nSecurityID; ///  证券ID

 2: i32 nTime; ///  序号/时间/日期 HHMMSSmmm

 3: i32 nTradingPhaseCode; ///  0:开市前 1:开盘集合竞价 2:连续竞价 3:临时停盘 4:收盘集合竞价 5:集中竞价闭市 6:协议转让结束 7:闭市

 4: i32 nPreClosePx; ///  昨收价 * 10000

 5: i32 nOpenPx; ///  开盘价 ..

 6: i32 nHighPx; ///  最高价 ..

 7: i32 nLowPx; ///  最低价 ..

 8: i32 nLastPx; ///  最新价 ..

 9: i64 llTradeNum; ///  成交笔数

 10: i64 llVolume; ///  成交量

 11: i64 llValue; ///  成交额(*10000)

service SubscriberService

{

 // 推送消息

 // Oneway means the client only makes request and does not listen for any response at all. Oneway methods must be void.

 oneway void sendSnapshot(1:list Snapshot  lstSnapshot);

}

2. market_publisher.thrift  market_publisher 实现的服务接口

namespace cpp market_publisher

namespace java market_publisher

namespace perl market_publisher

namespace php market_publisher

struct SubscribeMarketParam

{

 1: string user_name;

 2: string password;

 3: i32 type; // 订阅类型

 4: string ip; // 接收推送数据的ip

 5: i16 port; // 接收推送数据的port

}

struct SubscribeMarketAck

{

 1: required i32 error_code; // 0,成功; 其它,失败

 2: optional string error_info;

}

struct GetStockBaseInfoParam

{

 1: required i32 stock_code;

}

struct GetStockBaseInfoAck

{

 1: required i32 error_code;

 2: optional string error_info;

 3: optional string stock_name;

}

service PublisherService

{

 // 订阅请求:订阅行情信息

 SubscribeMarketAck subscribeMarket(1:SubscribeMarketParam param);

 GetStockBaseInfoAck getStockBaseInfo(1:GetStockBaseInfoParam param);

}

3. subscriber_server.cpp

/*

 * Main.cpp

 */

#include  ../gen-cpp/SubscriberService.h

#include  ../gen-cpp/PublisherService.h

#include  thrift/protocol/TBinaryProtocol.h

#include  thrift/server/TSimpleServer.h

#include  thrift/transport/TSocket.h

#include  thrift/transport/TServerSocket.h

#include  thrift/transport/TBufferTransports.h

#include  boost/thread/thread.hpp

using namespace ::apache::thrift;

using namespace ::apache::thrift::protocol;

using namespace ::apache::thrift::transport;

using namespace ::apache::thrift::server;

using boost::shared_ptr;

using namespace ::market_subscriber;

using namespace ::market_publisher;

class SubscriberServiceHandler : virtual public SubscriberServiceIf {

 public:

 SubscriberServiceHandler() {

 // Your initialization goes here

 }

 void sendSnapshot(const std::vector Snapshot    lstSnapshot) {

 // Your implementation goes here

 printf( sendSnapshot/n );

 std::cout    Received snapshots  number:     lstSnapshot.size()   std::endl;

 for (std::vector Snapshot ::const_iterator iter = lstSnapshot.begin();

 iter != lstSnapshot.end(); iter++)

 {

 std::cout    nSecurityID:     iter- nSecurityID    /t

 std::cout    nTime:     iter- nTime    /t

 std::cout   std::endl;

 }

 sleep(10);

 }

};

const char* CLIENT_LISTNE_IP =  127.0.0.1

const short CLIENT_LISTNE_PORT = 9060;

void subscriberServiceThread()

{

 shared_ptr SubscriberServiceHandler  handler(new SubscriberServiceHandler());

 shared_ptr TProcessor  processor(new SubscriberServiceProcessor(handler));

 shared_ptr TServerTransport  serverTransport(new TServerSocket(CLIENT_LISTNE_PORT));

 shared_ptr TTransportFactory  transportFactory(new TBufferedTransportFactory());

 shared_ptr TProtocolFactory  protocolFactory(new TBinaryProtocolFactory());

 TSimpleServer server(processor, serverTransport, transportFactory,

 protocolFactory);

 server.serve();

}

int main(int argc, char **argv)

{

 boost::thread thrd( subscriberServiceThread);

 // wait for subscriberServiceThread ready

 sleep(3);

 boost::shared_ptr TSocket  socket(new TSocket( 127.0.0.1 , 9090));

 boost::shared_ptr TTransport  transport(new TBufferedTransport(socket));

 boost::shared_ptr TProtocol  protocol(new TBinaryProtocol(transport));

 PublisherServiceClient client(protocol);

 try

 {

 transport- open();

 SubscribeMarketAck ack;

 SubscribeMarketParam param;

 param.__set_user_name( mazhan );

 param.__set_password( 123456 );

 param.__set_type(0);

 param.__set_ip(CLIENT_LISTNE_IP);

 param.__set_port(CLIENT_LISTNE_PORT);

 client.subscribeMarket(ack, param);

 std::cout    subscribeMarket(), error code:     ack.error_code   std::endl;

 transport- close();

 }

 catch (TException  tx)

 {

 std::cout    ERROR:     tx.what()   std::endl;

 }

 thrd.join();

 return 0;

}

4. pubsher_server.cpp

/*

 * Main.cpp

 */

#include  ../gen-cpp/SubscriberService.h

#include  ../gen-cpp/PublisherService.h

#include  thrift/protocol/TBinaryProtocol.h

#include  thrift/server/TSimpleServer.h

#include  thrift/transport/TSocket.h

#include  thrift/transport/TServerSocket.h

#include  thrift/transport/TBufferTransports.h

#include  thrift/concurrency/ThreadManager.h

#include  boost/thread/thread.hpp

using namespace ::apache::thrift;

using namespace ::apache::thrift::protocol;

using namespace ::apache::thrift::transport;

using namespace ::apache::thrift::server;

using namespace ::apache::thrift::concurrency;

using boost::shared_ptr;

using namespace ::market_publisher;

using namespace ::market_subscriber;

std::list boost::shared_ptr SubscriberServiceClient    g_lstSubscriberServiceClient;

boost::mutex g_mutexSubscriberServiceClient;

class PublisherServiceHandler : virtual public PublisherServiceIf {

 public:

 PublisherServiceHandler() {

 // Your initialization goes here

 }

 void subscribeMarket(SubscribeMarketAck  _return, const SubscribeMarketParam  param) {

 // Your implementation goes here

 std::cout    subscribeMarket, ip=    param.ip    , port=    param.port    .    std::endl;

 boost::shared_ptr TSocket  socket(new TSocket(param.ip, param.port));

 boost::shared_ptr TTransport  transport(new TBufferedTransport(socket));

 boost::shared_ptr TProtocol  protocol(new TBinaryProtocol(transport));

 boost::shared_ptr SubscriberServiceClient  client(new SubscriberServiceClient(protocol));

 int error_code = 1; // fail to open

 try

 {

 transport- open();

 error_code = 0;

 { // add to subscribes list

 boost::mutex::scoped_lock lock(g_mutexSubscriberServiceClient);

 g_lstSubscriberServiceClient.push_back(client);

 }

 }

 catch (TException  e)

 {

 std::cout    Exception:     e.what()   std::endl;

 _return.__set_error_info(e.what());

 }

 catch (std::exception  e)

 {

 std::cout    Exception:     e.what()   std::endl;

 _return.__set_error_info(e.what());

 }

 catch (
使用thrift实现订阅服务和发布服务详解编程语言)

 {

 char buff[100];

 snprintf(buff, 99,  fail to open %s:%d. , param.ip.c_str(), param.port);

 std::cout    Exception:     buff   std::endl;

 _return.__set_error_info(buff);

 }

 _return.__set_error_code(error_code);

 }

 void getStockBaseInfo(GetStockBaseInfoAck  _return, const GetStockBaseInfoParam  param) {

 // Your implementation goes here

 printf( getStockBaseInfo/n );

 }

};

int32_t getCurTime()

{

 time_t t = time(0);

 char tmp[64];

 strftime(tmp, sizeof(tmp),  %H%M%S , localtime( t));

 return atoi(tmp);

}

// send markets to subscribers.

void publisherServiceThread()

{

 while (1)

 {

 std::vector Snapshot  lstSnapshot;

 Snapshot snapshot;

 snapshot.nSecurityID = 100000001;

 snapshot.nTime = getCurTime() * 1000 + rand() % 1000;

 snapshot.nTradingPhaseCode = 2;

 snapshot.nPreClosePx = 240500;

 snapshot.nOpenPx = 250500;

 snapshot.nHighPx = 250800;

 snapshot.nLowPx = 240800;

 snapshot.nLastPx = 250300;

 snapshot.llTradeNum = 15000;

 snapshot.llVolume = 6000000;

 snapshot.llValue = 15030000000;

 lstSnapshot.push_back(snapshot);

 boost::mutex::scoped_lock lock(g_mutexSubscriberServiceClient);

 std::list boost::shared_ptr SubscriberServiceClient   ::iterator iter = g_lstSubscriberServiceClient.begin();

 while (iter != g_lstSubscriberServiceClient.end())

 {

 try

 {

 (*iter)- sendSnapshot(lstSnapshot);

 iter++;

 }

 catch (TException  e)

 {

 std::cout    Exception:     e.what()   std::endl;

 iter = g_lstSubscriberServiceClient.erase(iter);

 }

 catch (std::exception  e)

 {

 std::cout    Exception:     e.what()   std::endl;

 iter = g_lstSubscriberServiceClient.erase(iter);

 }

 catch (
使用thrift实现订阅服务和发布服务详解编程语言)

 {

 std::cout    Exception: fail to call sendSnapshot().    std::endl;

 iter = g_lstSubscriberServiceClient.erase(iter);

 }

 }

 sleep(3);

 }

}

int main(int argc, char **argv)

{

 int port = 9090;

 shared_ptr PublisherServiceHandler  handler(new PublisherServiceHandler());

 shared_ptr TProcessor  processor(new PublisherServiceProcessor(handler));

 shared_ptr TServerTransport  serverTransport(new TServerSocket(port));

 shared_ptr TTransportFactory  transportFactory(

 new TBufferedTransportFactory());

 shared_ptr TProtocolFactory  protocolFactory(new TBinaryProtocolFactory());

 TSimpleServer server(processor, serverTransport, transportFactory,

 protocolFactory);

 boost::thread thrd( publisherServiceThread);

 printf( Starting the server
使用thrift实现订阅服务和发布服务详解编程语言/n );

 server.serve();

 printf( done./n );

 return 0;

}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/18486.html

cgojavaphp