zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

《Storm分布式实时计算模式》——3.4 Trident运算

2023-09-11 14:19:10 时间

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第3章,第3.4节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.4 Trident运算

时间戳已经生成好了,下一步是加入处理事件的逻辑组件。在Trident中,这些组件称为运算(operation)。在我们的topology中,使用两种不同的运算:filter和function。
运算通过Stream对象的方法来调用。这个例子中,我们使用了Stream对象的下述方法:


 a href=https://yqfile.alicdn.com/46e7bea64528ebdb3f64b3cb53140489e03591ff.png
"

注意前面代码中列出的方法返回形式为Stream对象或者TridentState对象,返回可以用来创建新的数据流。因此,运算可以连在一起使用流式接口形式的Java代码。让我们再看看示例topology中的关键代码:

 a href=https://yqfile.alicdn.com/f0d2d1524271a5239e587c3c0700475aeec9a7b1.png
"

通常,应用运算需要声明一个输入域集合和一个输出域集合,也就是funcition域。上面代码中topology第二行声明我们需要CityAssignment对数据流中的每个tuple执行操作。在每个tuple中,CityAssignment会在event字段上运算并且增加一个叫做city的新字段,这个字段会附在tuple中向后发射。
每个操作在流式风格的语法上略有不同,这取决于操作需要哪些信息。下面将介绍不同操作的详细语法和语义。
3.4.1 Trident filter
我们topology逻辑中的第一部分就是个过滤器filter,它会忽略掉我们不关心的疾病事件。在这个例子中,系统只关心脑膜炎(meningitis)的病情,从前面表格中看到,脑膜炎对应的疾病代码是320、321和322。
为了通过疾病代码过滤事件,我们需要利用Trident filter。Trident通过提供BaseFilter类,我们通过实现子类就可以方便地对tuple进行过滤,滤除系统不需要的tuple。BaseFilter类实现了Filter接口,这个接口如下面代码片段所示:


446ccb57e74454f084a446537c716ba03d54a5fa

为了在数据流中过滤tuple,应用需要通过继承BaseFilter类来实现这个接口。这个例子中,我们使用下述过滤器过滤事件:

ba130363a5f4f5faf6ab6c07be99f8374d7aa01b

上面的代码中,我们从tuple中提取了DiagnosisEvent并且检查疾病代码。因为所有的脑膜炎代码小于等于322,我们也没有发送其他代码,所以只需要简单地检查代码是否小于322,就可以决定事件是否和脑膜炎有关。
Filter操作结果返回True的tuple将会被发送到下游进行操作。如果方法返回False,该tuple就不会发送到下游。
在我们的topology中,我们在数据流上使用each(inputFields,filter)方法,将这个过滤器应用到数据流的每个tuple中:

3d4e63bda34a7fb899bf489211c25f75393fe723

3.4.2 Trident function
在filter之外,Storm还提供了一个更通用功能的接口function。function和Storm的bolt类似,读取tuple并且发送新的tuple。其中一个区别是,Trident function只能添加数据。function发送数据时,将新字段添加在tuple中,并不会删除或者变更已有的字段。
function接口如下代码片段所示:

 a href=https://yqfile.alicdn.com/beff14fd4d5d776524b631d21ff364c68ce26b01.png
"

和Storm的bolt类似,function实现了一个包括实际逻辑的方法execute。function的实现也可以选用TridentCollector来发送tuple到新的function中。用这种方式,function也可以用来过滤tuple,起到filter的作用。
我们topology中的第一个function是CityAssignment,如下所示:

 a href=https://yqfile.alicdn.com/96f5af484cf5f33107478974af7e25235245791e.png
"


b7288346e87f87e02aa44920100f5cb20fb00024

在这个function中,我们使用静态初始化的方式建立了一个我们关心的城市的地图。示例中,function包括一个地图,存储了的坐标信息包括:Philadelphia(PHL)、New York City(NYC)、San Francisco(SF)和LosAngeles(LA)。
在execute()方法中,函数遍历城市计算事件和城市之间的距离。现实系统中,地理空间的索引效率会高很多。
function声明的字段数量必须和它发射出值的字段数一致。如果不一致,Storm就会抛出IndexOutOfBoundsException异常。
我们topology中的下一个function是HourAssignment,用来转化Unix时间戳为纪元时间的小时,可以用来对事件发生进行时间上的分组操作。HourAssignment的代码如下:

df7858b7101a4526516be6950ab431194768bbc0

f4e9bbc2d64df03fdfebda9ef1f69a322c9a8848

我们重写了这个function,同时发射了小时的数值,以及由城市、疾病代码、小时组合而成的key。实际上,这个组合值会作为聚合计数的唯一标识符,后面会详细解释。
我们topology中最后两个funciton是用来侦测疾病暴发并且告警的。OutbreakDetector类的代码如下:

7ae5e5fe1010d6caa04985485d3f6f79e48c8dcf

这个function提取出了特定城市、疾病、时间的发生次数,并且检查计数是否超过了设定的阈值。如果超过,发送一个新的字段包括一条告警信息。在上面代码里,注意这个function实际上扮演了一个过滤器的角色,但是却作为一个function的形式来实现,是因为需要在tuple中添加新的字段。因为filter不能改变tuple,当我们既想过滤又想添加字段时必须使用function。
最后一个function的功能就是发布一个告警(并且结束程序)。代码如下:


6a036fcc8d96405ffa8d79ac6bf8e3858987c9f3

 a href=https://yqfile.alicdn.com/f6fa126f5a8efa4d10f9909240f12104e8f1e658.png"

这个方法非常简单,提取了告警的内容,并写入日志,最后结束整个程序。
【Flink】深入理解Flink-On-Yarn模式 Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式,本文分析两种模式及启动流程。
Flink 1.12 yarn-cluster模式触发Savepoint with Yarn指定-yid报异常failed timeout问题及解决 官方给出触发Savepoint with YARN的命令指定了-yid,测试后发现不应指定-yid。分析应该是早期版本需指定-yid,后期版本(至少Flink 1.12)不需要指定-yid,而官网文档未及时更新这个细节问题。
Flink 中的应用部署:当前状态与新应用模式 作为现代企业的重要工具,流处理和实时分析这类工具逐渐兴起,越来越多的企业以 Apache Flink 为核心构建平台,并将其作为服务在内部提供。
实时计算Flink on Kubernetes产品模式介绍 Flink产品介绍 目前实时计算的产品已经有两种模式,即共享模式和独享模式。这两种模式都是全托管方式,这种托管方式下用户不需要关心整个集群的运维。其次,共享模式和独享模式使用的都是Blink引擎。这两种模式为用户提供的主要功能也类似, 都提供开发控制台; 开发使用的都是Blink SQL,其中独享模式由于进入了用户的VPC,部署在用户的ECS上,因此可以使用很多底层的API,如UDX; 都提供一套的开箱即用的metric收集、展示功能; 都提供作业监控和报警功能。
我们在使用淘宝时,与店家交流时,你根本不知道后面的小二是一个人还是多个人共用一个账号,还有可能是多个人使用了多个账号但是对消费者只看到一个。