使用thrift实现订阅服务和发布服务详解编程语言
功能:market_subscriber 能够向 market_publisher 请求订阅某些类型的消息,当 market_publisher 有该类型的消息时,需要把它推送给订阅服务。
流程:1. market_publisher 服务启动,监听端口,注册事件处理函数,处理订阅请求消息。
2. market_subscriber 服务启动,监听端口,注册事件处理函数,处理接收推动来的消息。
3. market_subscriber 向 market_publisher 发起订阅请求,market_publisher 根据订阅请求参数,长连接 market_subscriber 提供的消息接收端口。
4. market_publisher 通过长连接向 market_subscriber 推送消息。
注意:1. market_publisher 到 market_subscriber 的长连接的维护:
(1)market_subscriber 一定时间内未收到 market_publisher 的推送消息,尝试重新发起订阅请求。
(2)market_publisher 推送订阅消息时,发现连接断开,尝试重连。 考虑 market_publisher 有重启的情况,收到的订阅请求参数需要做持久化。 == TODO:增加一个 market_subscriber 到 market_publisher 的取消订阅的请求。
实现:
1. market_subscriber.thrift market_subscriber 实现的服务接口
namespace java market_subscriber
namespace perl market_subscriber
namespace php market_subscriber
struct Snapshot
{
1: i32 nSecurityID; /// 证券ID
2: i32 nTime; /// 序号/时间/日期 HHMMSSmmm
3: i32 nTradingPhaseCode; /// 0:开市前 1:开盘集合竞价 2:连续竞价 3:临时停盘 4:收盘集合竞价 5:集中竞价闭市 6:协议转让结束 7:闭市
4: i32 nPreClosePx; /// 昨收价 * 10000
5: i32 nOpenPx; /// 开盘价 ..
6: i32 nHighPx; /// 最高价 ..
7: i32 nLowPx; /// 最低价 ..
8: i32 nLastPx; /// 最新价 ..
9: i64 llTradeNum; /// 成交笔数
10: i64 llVolume; /// 成交量
11: i64 llValue; /// 成交额(*10000)
}
service SubscriberService
{
// 推送消息
// Oneway means the client only makes request and does not listen for any response at all. Oneway methods must be void.
oneway void sendSnapshot(1:list Snapshot lstSnapshot);
}
2. market_publisher.thrift market_publisher 实现的服务接口
namespace java market_publisher
namespace perl market_publisher
namespace php market_publisher
struct SubscribeMarketParam
{
1: string user_name;
2: string password;
3: i32 type; // 订阅类型
4: string ip; // 接收推送数据的ip
5: i16 port; // 接收推送数据的port
}
struct SubscribeMarketAck
{
1: required i32 error_code; // 0,成功; 其它,失败
2: optional string error_info;
}
struct GetStockBaseInfoParam
{
1: required i32 stock_code;
}
struct GetStockBaseInfoAck
{
1: required i32 error_code;
2: optional string error_info;
3: optional string stock_name;
}
service PublisherService
{
// 订阅请求:订阅行情信息
SubscribeMarketAck subscribeMarket(1:SubscribeMarketParam param);
GetStockBaseInfoAck getStockBaseInfo(1:GetStockBaseInfoParam param);
}
3. subscriber_server.cpp
* Main.cpp
*/
#include ../gen-cpp/SubscriberService.h
#include ../gen-cpp/PublisherService.h
#include thrift/protocol/TBinaryProtocol.h
#include thrift/server/TSimpleServer.h
#include thrift/transport/TSocket.h
#include thrift/transport/TServerSocket.h
#include thrift/transport/TBufferTransports.h
#include boost/thread/thread.hpp
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
using namespace ::market_subscriber;
using namespace ::market_publisher;
class SubscriberServiceHandler : virtual public SubscriberServiceIf {
public:
SubscriberServiceHandler() {
// Your initialization goes here
}
void sendSnapshot(const std::vector Snapshot lstSnapshot) {
// Your implementation goes here
printf( sendSnapshot/n );
std::cout Received snapshots number: lstSnapshot.size() std::endl;
for (std::vector Snapshot ::const_iterator iter = lstSnapshot.begin();
iter != lstSnapshot.end(); iter++)
{
std::cout nSecurityID: iter- nSecurityID /t
std::cout nTime: iter- nTime /t
std::cout std::endl;
}
sleep(10);
}
};
const char* CLIENT_LISTNE_IP = 127.0.0.1
const short CLIENT_LISTNE_PORT = 9060;
void subscriberServiceThread()
{
shared_ptr SubscriberServiceHandler handler(new SubscriberServiceHandler());
shared_ptr TProcessor processor(new SubscriberServiceProcessor(handler));
shared_ptr TServerTransport serverTransport(new TServerSocket(CLIENT_LISTNE_PORT));
shared_ptr TTransportFactory transportFactory(new TBufferedTransportFactory());
shared_ptr TProtocolFactory protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory,
protocolFactory);
server.serve();
}
int main(int argc, char **argv)
{
boost::thread thrd( subscriberServiceThread);
// wait for subscriberServiceThread ready
sleep(3);
boost::shared_ptr TSocket socket(new TSocket( 127.0.0.1 , 9090));
boost::shared_ptr TTransport transport(new TBufferedTransport(socket));
boost::shared_ptr TProtocol protocol(new TBinaryProtocol(transport));
PublisherServiceClient client(protocol);
try
{
transport- open();
SubscribeMarketAck ack;
SubscribeMarketParam param;
param.__set_user_name( mazhan );
param.__set_password( 123456 );
param.__set_type(0);
param.__set_ip(CLIENT_LISTNE_IP);
param.__set_port(CLIENT_LISTNE_PORT);
client.subscribeMarket(ack, param);
std::cout subscribeMarket(), error code: ack.error_code std::endl;
transport- close();
}
catch (TException tx)
{
std::cout ERROR: tx.what() std::endl;
}
thrd.join();
return 0;
}
4. pubsher_server.cpp
* Main.cpp
*/
#include ../gen-cpp/SubscriberService.h
#include ../gen-cpp/PublisherService.h
#include thrift/protocol/TBinaryProtocol.h
#include thrift/server/TSimpleServer.h
#include thrift/transport/TSocket.h
#include thrift/transport/TServerSocket.h
#include thrift/transport/TBufferTransports.h
#include thrift/concurrency/ThreadManager.h
#include boost/thread/thread.hpp
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;
using boost::shared_ptr;
using namespace ::market_publisher;
using namespace ::market_subscriber;
std::list boost::shared_ptr SubscriberServiceClient g_lstSubscriberServiceClient;
boost::mutex g_mutexSubscriberServiceClient;
class PublisherServiceHandler : virtual public PublisherServiceIf {
public:
PublisherServiceHandler() {
// Your initialization goes here
}
void subscribeMarket(SubscribeMarketAck _return, const SubscribeMarketParam param) {
// Your implementation goes here
std::cout subscribeMarket, ip= param.ip , port= param.port . std::endl;
boost::shared_ptr TSocket socket(new TSocket(param.ip, param.port));
boost::shared_ptr TTransport transport(new TBufferedTransport(socket));
boost::shared_ptr TProtocol protocol(new TBinaryProtocol(transport));
boost::shared_ptr SubscriberServiceClient client(new SubscriberServiceClient(protocol));
int error_code = 1; // fail to open
try
{
transport- open();
error_code = 0;
{ // add to subscribes list
boost::mutex::scoped_lock lock(g_mutexSubscriberServiceClient);
g_lstSubscriberServiceClient.push_back(client);
}
}
catch (TException e)
{
std::cout Exception: e.what() std::endl;
_return.__set_error_info(e.what());
}
catch (std::exception e)
{
std::cout Exception: e.what() std::endl;
_return.__set_error_info(e.what());
}
catch (
)
{
char buff[100];
snprintf(buff, 99, fail to open %s:%d. , param.ip.c_str(), param.port);
std::cout Exception: buff std::endl;
_return.__set_error_info(buff);
}
_return.__set_error_code(error_code);
}
void getStockBaseInfo(GetStockBaseInfoAck _return, const GetStockBaseInfoParam param) {
// Your implementation goes here
printf( getStockBaseInfo/n );
}
};
int32_t getCurTime()
{
time_t t = time(0);
char tmp[64];
strftime(tmp, sizeof(tmp), %H%M%S , localtime( t));
return atoi(tmp);
}
// send markets to subscribers.
void publisherServiceThread()
{
while (1)
{
std::vector Snapshot lstSnapshot;
Snapshot snapshot;
snapshot.nSecurityID = 100000001;
snapshot.nTime = getCurTime() * 1000 + rand() % 1000;
snapshot.nTradingPhaseCode = 2;
snapshot.nPreClosePx = 240500;
snapshot.nOpenPx = 250500;
snapshot.nHighPx = 250800;
snapshot.nLowPx = 240800;
snapshot.nLastPx = 250300;
snapshot.llTradeNum = 15000;
snapshot.llVolume = 6000000;
snapshot.llValue = 15030000000;
lstSnapshot.push_back(snapshot);
boost::mutex::scoped_lock lock(g_mutexSubscriberServiceClient);
std::list boost::shared_ptr SubscriberServiceClient ::iterator iter = g_lstSubscriberServiceClient.begin();
while (iter != g_lstSubscriberServiceClient.end())
{
try
{
(*iter)- sendSnapshot(lstSnapshot);
iter++;
}
catch (TException e)
{
std::cout Exception: e.what() std::endl;
iter = g_lstSubscriberServiceClient.erase(iter);
}
catch (std::exception e)
{
std::cout Exception: e.what() std::endl;
iter = g_lstSubscriberServiceClient.erase(iter);
}
catch (
)
{
std::cout Exception: fail to call sendSnapshot(). std::endl;
iter = g_lstSubscriberServiceClient.erase(iter);
}
}
sleep(3);
}
}
int main(int argc, char **argv)
{
int port = 9090;
shared_ptr PublisherServiceHandler handler(new PublisherServiceHandler());
shared_ptr TProcessor processor(new PublisherServiceProcessor(handler));
shared_ptr TServerTransport serverTransport(new TServerSocket(port));
shared_ptr TTransportFactory transportFactory(
new TBufferedTransportFactory());
shared_ptr TProtocolFactory protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory,
protocolFactory);
boost::thread thrd( publisherServiceThread);
printf( Starting the server
/n );
server.serve();
printf( done./n );
return 0;
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/18486.html
cgojavaphp相关文章
- Spring Cloud 微服务优雅下线 + 灰度发布的正确姿势,写得太好了!
- 服务架构开发实战:熔断与降级的区别、如何集成Hystrix
- 微服务开发的12项要素详解编程语言
- Python创建Windows 服务详解编程语言
- ScalaPB(2): 在scala中用gRPC实现微服务详解编程语言
- ScalaPB(0): 找寻合适的内部系统微服务集成工具详解编程语言
- xinetd服务详解编程语言
- Spring Cloud(十三):Spring Cloud Sleuth服务链路追踪(zipkin)详解编程语言
- Python 缓存服务详解编程语言
- 服务Oracle NTP服务:实现时钟同步功能(oraclentp)
- Spring Boot(十):邮件服务详解编程语言
- 服务订单SO创建详解编程语言
- php开发web服务原理详解编程语言
- spring web.xml配置服务启动后执行文件详解编程语言
- Zookeeper——分布式协调服务Zookeeper介绍详解编程语言
- 找靓机回收,坚持用服务赢得用户信任
- 服务 探索Linux系统中NTP服务的实现(linux查看ntp)
- 微众银行创新供应链金融服务模式 服务实体经济发展
- 一步步解决: Oracle 服务器卸载(oracle卸载服务)
- Linux系统下SNMP服务的配置指南(linux下snmp配置)
- 极客漫画:你准备好微服务了吗?
- 提升业绩:Oracle服务支持让你轻松到达成功(oracle服务支持)
- 用CMD命令查看MySQL服务状态(cmd查看mysql服务)
- 重新让Redis服务再次启航(加载redis服务)
- 向医者致敬,为健康加油!京东健康上线“医疗服务月”活动
- 重启IIS服务的几种方法小结