使用Spark Streaming转换不同的JSON有效负载
【51CTO.com快译】Spark Streaming 是底层基于 Spark Core 的对大数据进行实时计算的框架,可以流方式从源读取数据。只需要从数据源创建一个读取流,然后我们可以创建写入流以将数据加载到目标数据源中。
接下来的演示,将假设我们有不同的 JSON 有效负载进入一个 kafka 主题,我们需要将其转换并写入另一个 kafka 主题。
创建一个ReadStream
为了能连续接收JSON有效负载作为消息。我们需要首先读取消息并使用spark的readstream创建数据帧。Spark 中提供了 readStream 函数,我们可以使用这个函数基本上创建一个 readStream。这将从 kafka 主题中读取流负载。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
我们可以创建一个 case-class(例如CustomerUnion),它将包含JSON有效负载的所有可能字段。这样,我们就能在数据帧上运行select查询而不会失败。
val rawDfValue = rawData.selectExpr("CAST(value AS STRING)").as[String]
val schema = ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]
val extractedDFWithSchema = rawDfValue.select(from_json(col("value"), schema).as("data")).select("data.*")
extractedDFWithSchema.createOrReplaceTempView(“tempView”)
这将为我们提供一个数据帧提取的 DFWithSchema,其中包含作为有效负载字段的列。
示例输入负载
这是两个样本输入有效负载,但也可以有更多的有效负载,有些字段不存在(变量)。
{
“id”: 1234,
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
{
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
样例输出负载
根据id字段,我们将决定输出有效负载。如果存在一个 id 字段,我们将把它视为一个用户更新案例,并且在输出有效负载中只发送“Email”和“Phone”。我们可以根据某些条件配置任何字段。这只是一个例子。
如果 id 不存在,我们将发送所有字段。下面是两个输出载荷的示例:
{
“userid”: 1234,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
{
“fullname”:”Jon Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
开始WriteStreams
一旦我们有了数据帧,我们就可以运行尽可能多的sql查询,并根据所需的有效负载写入 kafka 主题。因此,我们可以创建一个包含所有sql查询的列表,并通过该列表进行循环,并调用writeStream函数。让我们假设,我们有一个名为 queryList 的列表,它只包含字符串(即sql查询)。
下面为写入流定义的一个函数:
def startWriteStream(query: String): Unit = {
val transformedDf = spark.sql(query)
transformedDf
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
}
这将启动列表中每个查询的写入流。
queryList.foreach(startWriteStream)
spark.streams.awaitAnyTermination()
如果我们知道输入有效负载的所有可能字段,那么即使有一些字段不存在,我们的sql查询也不会失败。我们已经将有效负载的模式指定为case-class,它将为缺席字段创建指定 NULL 的数据帧。
通过这种方式,我们可以使用 spark-streaming 在所需的转换/过滤器之后将多个有效负载从同一主题写入不同的主题。
【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】
相关文章
- 对话南大智慧城市:技术之外,智慧城市建设还需要人、土地和投融资 | 镁客·请讲
- 「中国芯」提速!首个国产「小芯片」标准发布,专家这样说
- 中电金信曲向阳:将「敏捷」和「精益」融入企架,提升金融服务应用效能
- 技术汇总:第十八章:枚举的简单使用
- 灵犀微光陈飞:AR短期仍将深耕B端,光学模组会成为「胜负手」 | 镁客·请讲
- Flowable 设置任务处理人的四种方式
- Win7/2008R2 蓝屏0x0000007e
- 有了这个网站,妈妈再也不用担心我找不到好看的配图了!
- 网站长辅助工具:违规违禁关键词过滤
- 这个网站,可以一键爬取网页上的所有图片!
- 知识点!你知道什么是幂等请求吗?
- 漫画 | 图灵奖是怎么来的?
- 推荐一个方便好用的 ChatGPT 客户端!
- Flowable 任务如何认领,回退?
- Flowable 按角色分配任务
- 七年,网易云信做新「高效连接」
- 智能家居赛道再现“鲶鱼”,“智有范”敲碎“三重门”
- redis位图-bitmap
- YVR双十一杀出,VR市场上演“干翻巨头”戏码
- 鏖战双11!ELEGOO助力LCD光固化3D打印机强势破圈