zl程序教程

您现在的位置是:首页 >  后端

当前栏目

让Storm插上CEP的翅膀 - Siddhi调研和集成

集成 storm 调研 翅膀 插上
2023-09-11 14:16:09 时间
什么是 Siddhi?

Siddhi 是一种 lightweight, easy-to-use, open source CEP(Complex Event Processing)引擎,由wso2公司开发(http://wso2.com/about/)。

image

像绝大多数的 CEP 系统一样,Siddhi 支持对于流式数据的类 SQL 的查询,SQL 式的 query 通过 complier 翻译成 Java 代码。 
当一条数据流或多条数据流流入时,Siddhi Core 会实时的 check 当前数据流是否满足定义的 query,如果满足则触发 Callback 执行相应的逻辑。

Siddhi和传统的CEP系统,如Esper,相比区别? 
主要是比较轻量和高效,之所以可以达到更高的 performance,因为:

Multi-threading Queues and use of pipelining Nested queries and chaining streams Query optimization and common sub query elimination

尤其是前两点非常关键,传统的CEP系统,如果Esper,都是使用单线程去处理所有的 query matching,这样虽然简单,但是效率不高,无法利用 cpu 多核。 
所以 Siddhi 采用多线程,并且结合pipeline机制,如下图

image

Siddhi 将整个 query 切分成独立的 stages,即 processors,这样做的好处,首先是便于多线程化,再者,可以重用相同的 processor; 
而 processor 之间通过 queue 进行连接,这里就不详细描述了,有兴趣的同学可以去仔细看 Siddhi 的论文和文档。

 

Siddhi 能做什么?

下面我们就来看看,最关键的,Siddhi 可以为我们做什么?

这里就用几个形象的例子来说明 Siddhi 使用的典型的场景

简单 ETL

我们先用个最简单的例子,看看如果 run 一个真正的 Siddhi 例子,

上面说了,Siddhi 是用类 SQL 的查询语言,

首先需要先定义流的格式,

define stream TempStream (deviceID long, roomNo int, temp double);

然后定义查询,

from TempStream 

select roomNo, temp * 9/5 + 32 as temp, F as scale, roomNo = 100 and roomNo 110 as isServerRoom

insert into RoomTempStream;

这样就能实现一个完整的 ETL 过程, 
extraction,将需要的字段从 TempStream 里面 select 出来; 
transform, 将摄氏温度转换为华氏温度; 
loading,将结果输出到RoomTempStream流;

很方便,不用再另外写任何的逻辑,只需要写类SQL的Query语句。 
为了增加感性认识,我给出一个完成的 Java 测试例子,

复制代码
SiddhiManager siddhiManager = new SiddhiManager();

String executionPlan = "" +

 "ddefine stream TempStream (deviceID int, roomNo int, temp float);" +

 "" +

 "@info(name = query1) " +

 "from TempStream " +

 "select roomNo, temp * 9/5 + 32 as temp, F as scale, roomNo = 100 and roomNo 110 as isServerRoom " +

 "insert into RoomTempStream;";

ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

executionPlanRuntime.addCallback("query1", new QueryCallback() {

 @Override

 public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {

 EventPrinter.print(timeStamp, inEvents, removeEvents);

InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");

executionPlanRuntime.start();

inputHandler.send(new Object[] {12344, 201, 28.2f});

inputHandler.send(new Object[] {12345, 202, 22.2f});
inputHandler.send(new Object[] {12346, 203, 24.2f});
//Shutting down the runtime

executionPlanRuntime.shutdown();

//Shutting down Siddhi

siddhiManager.shutdown();
复制代码

 

基于 window 聚合

Siddhi 支持很多中类型的 window,具体参考https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-time

这里给出最基本的,基于时间窗口的例子,

from TempStream#window.time(1 min)

select roomNo, avg(temp) as avgTemp

group by roomNo

insert all events into AvgRoomTempStream ;

这个查询会计算以1分钟为滑动窗口的,每个 room 的平均温度

Siddhi时间窗口也支持,按照外部的输入的时间进行聚合,但是它要求时间是必须递增的;这点我们brain的聚合库比它通用,可以适用于非单调递增的场景

 

多个流 Join

Siddhi 支持基于 window 的多个流的实时 join,

from TempStream[temp 30.0]#window.time(1 min) as T 

 join RegulatorStream[isOn == false]#window.length(1) as R

 on T.roomNo == R.roomNo

select T.roomNo, T.temp, R.deviceID, start as action

insert into RegulatorActionStream ;

上面的查询将,TempStream 和RegulatorStream 通过 roomNo 进行 join

 

Pattern Query

这种 query 最能表达出 CEP 的威力,什么是Pattern Query? 
“Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival.”

直接看个例子,用 Pattern 查询来 detect credit card/ATM transaction frauds:

from every a1 = atmStatsStream[amountWithdrawed 100]

 - b1 = atmStatsStream[amountWithdrawed 10000 and a1.cardNo == b1.cardNo]

 within 1 day

select a1.cardNo as cardNo, a1.cardHolderName as cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as location, b1.cardHolderMobile as cardHolderMobile

insert into possibleFraudStream;

注意看到这个符号‘- ’,这个表示 event 发生顺序, 
上面这个查询的意思就是,在一天内,出现一次取现金额 100后,同一张卡,出现取现金额 10000,则认为可能是 fraud。

当然这只是个例子,不是说这样真的可以 detect fraud。你可以参照这个,写出更为复杂的查询。

 

Sequence Query

和 pattern 的区别是,pattern 的多个 event 之间可以是不连续的,但 sequence 的 events 之间必须是连续的。

我们可以看个例子,用 sequence 来发现股票价格的 peak:

from every e1=FilteredStockStream[price 20], 
e2=FilteredStockStream[((e2[last].price is null) and price =e1.price) or ((not (e2[last].price is null)) and price =e2[last].price)],
e3=FilteredStockStream[price e2[last].price] select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak insert into PeakStream ;

上面的查询的意思, 
e1,收到一条 event.price 20 
e2,后续收到的所有 events 的 price,都大于前一条 event 
e3,最终收到一条 event 的 price,小于前一条 event

ok,我们发现了一个peak

 

Siddhi 还有其他很多的功能,这里就不一一说明。。。。。。

 

集成到 Storm

那么最后,我们看看如何将 Siddhi 融入到我们当前的框架中,达到作为 Brain 补充的目的。

我将 Siddhi core 封装成一个 Siddhi Bolt,这样可以在 JStorm 的 topology 中很灵活的,选择是否什么方案,可以部分统计用 brain,部分用 Siddhi,非常简单。

废话不说,直接给出源码,供大家参考,

复制代码
public class SiddhiBolt implements IRichBolt {

protected OutputCollector collector;

 protected SiddhiManager siddhiManager = null;

 protected String executionPlan = null;

 ExecutionPlanRuntime executionPlanRuntime = null;

 protected HashMap String,InputHandler handlers = null;

 public SiddhiBolt(String plan) {

 this.executionPlan = plan;

 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

 this.collector = collector;

 this.siddhiManager = new SiddhiManager();

 this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

 addCallbacks();

 handlers = new HashMap String,InputHandler ();

 executionPlanRuntime.start();

 public void execute(Tuple tuple) {

 String inputStream = tuple.getSourceStreamId();

 InputHandler inputHandler = getInputHandler(inputStream);

 List Object values = tuple.getValues();

 Object[] objects = values.toArray();

 try {

 inputHandler.send(objects);

 }catch (Exception e){

 LOG.error("Send stream event error: ", e);

 // collector.fail(tuple); //test replay

 collector.ack(tuple); // remember ack the tuple

 // Make sure that add anchor tuple if you want to track it

 // collector.emit(streamid, tuple,new Values(counters, now));

 public InputHandler getInputHandler(String streamName){

 InputHandler handler = null;

 if(handlers.containsKey(streamName))

 handler = handlers.get(streamName);

 else {

 handler = executionPlanRuntime.getInputHandler(streamName);

 if (handler != null) {

 handlers.put(streamName, handler);

 return handler;

 //Need Override

 public void addCallbacks( ){

 //StreamCallback example

 executionPlanRuntime.addCallback("outputStream", new StreamCallback() {

 @Override

 public void receive(Event[] events) {

 LOG.info("receive events: " + events.length);

 for (Event e:events)

 LOG.info(e);

 //QueryCallback example

 executionPlanRuntime.addCallback("query1", new QueryCallback() {

 @Override

 public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {

 printEvents(timeStamp, inEvents, removeEvents);

 public void printEvents(long timeStamp, Event[] inEvents, Event[] removeEvents){

 StringBuilder sb = new StringBuilder();

 sb.append("Events{ @timeStamp = ").append(timeStamp).append(", inEvents = ").append(

 Arrays.deepToString(inEvents)).append(", RemoveEvents = ").append(Arrays.deepToString(removeEvents)).append(" }");

 LOG.info(sb.toString());

 public void cleanup() {

 //Shutting down the runtime

 executionPlanRuntime.shutdown();

 //Shutting down Siddhi

 siddhiManager.shutdown();

}

2015-12-15

大咖说·对话生态|当Confluent遇见云:实时流动的数据更有价值 Confluent如何实现高速增长?如何帮助客户实现数字化转型?有哪些成功经验值得借鉴?本期大咖说,阿里云联合Confluent一同探讨企业在大数据应用方面的数字化转型之道。
云计算情报局预告|告别 Kafka Streams,让轻量级流处理更加简单 消息的流式计算主要采取接入Flink、Kafka Streams等技术方案,但面对70%以上都是简单流场处理景的需求,传统方案的弊端会被不断放大,客户仍然需要投入较大的人力成本和较高的资源,同时整个架也比较更复杂。消息队列Kafka版 发布Kafka-ETL组件,是一款免运维的流计算组件,基于低代码开发可满足包括格式转换、内容富化、本地聚合、路由分发等常用的数据处理需求。
微软推人人可用的机器学习,打通windows应用程序任督二脉,惠及5000万开发者 微软「Build开发者大会」首次线上开幕,CEO Nadella对开发者倾诉衷肠,微软此次推出了WhiteNoise等多个机器学习工具包,让你的机器学习模型更上一层楼,Project Reunion一统Windows应用程序开发!更有量子计算平台预览版,HoloLens、Office套件等强力升级更新。
汽车之家基于 Flink 的数据传输平台的设计与实践 数据接入与传输作为打通数据系统与业务系统的一道桥梁,是数据系统与架构中不可或缺的一个重要部分。数据传输系统稳定性和准确性,直接影响整个数据系统服务的 SLA 和质量。此外如何提升系统的易用性,保证监控服务并降低系统维护成本,优雅应对灾难等问题也十分重要。