zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

利用 Lambda 将 Kinesis Data Stream 数据批量自动写入 MSK

2023-02-26 12:32:03 时间

背景

在混合云架构中,用户的一些应用原本运行在自建的数据中心。这些应用程序统一从 Kafka 中拉取实时数据做分析和处理,例如监控系统、大数据分析平台等。由于业务发展需要,用户将这些工作负载部分迁移到了 AWS 上,或者在 AWS 上构建新的应用。

由于 AWS 部分服务仅支持以 Kinesis Data Stream 方式输出日志或数据,例如 Pinpoint。Kinesis Data Stream 数据投递一般使用 Kinesis Firehose 或者自有应用拉取。而 Kinesis Firehose 的数据投递目标并不支持 Kafka,因此需要用户修改应用拉取数据的方式修改应用去拉取数据。如果这个应用需要同时运行数据中心和 AWS 云上,用户则需要维护支持 2 个接口的应用;或者假如这些应用的数量非常多,用户改造和测试它需要大量的人力和时间的时候;我们会希望通过一种无需运维管理的方式,能自动将 Kinesis Data Stream 中的数据导入到 Kafka 的方案。这样所有的应用无需修改,便能平滑迁移到 AWS 上,同时运维人力成本上也基本不会增加。

AWS Lambda 是一项无服务器事件驱动型计算服务。利用 AWS Lambda 的无服务器特性,我们可以做到无论需要对接多少个 Kinesis Data Stream 都可以支持;每个 Kinesis Data Stream 需要接收多大的数据流量,我们都能自动扩展;而底层资源无需运维管理。

解决方案

架构图

架构说明

本方案完全采用 Serverless 架构,主要通过 Lambda 实现 Kinesis 和 MSK 数据相互传输到目的。当 Lambda 执行失败的时候,会触发 SNS(可替换为SQS)以发送通知信息。

一键部署代码

参考 Github 开源代码 

部署方式

先安装 SAM-cli,具体步骤参考:安装 AWS SAMLinux 上的 CLI – AWS Serverless Application Model

然后下载代码修改参数后部署

git clone https://github.com/yourlin/Kinesis2MSK
cd Kinesis2MSK
sam build
sam deploy --guided

关键参数

主要配置文件 template.yaml,MSK 配置在 kinesis_to_msk/config.py

环境配置说明

全局配置

Globals:
  Function:
    Timeout: 5
    MemorySize: 128

函数超时时间默认为:5秒。

函数内存:128MB。如果加大每次批处理的数据量可适当提高内存量。

VPC 配置

VpcConfig:
        SecurityGroupIds:
          - sg-087f1d9e26d9140ad # 该安全组,是指定的 VPC 内提前创建好的
        SubnetIds:
          - subnet-0eafba9fee295d045 # 提前创建好的子网 1
          - subnet-093cb4ccabff0d3d5 # 提前创建好的子网 2
          - subnet-08c55a13b324420b8 # 提前创建好的子网 3

注意:除非 MSK 配置为公网可访问,否则必须将 Lambda 配置在和 MSK 同一个 VPC 或者跟 MSK 所在 VPC 已经打通的其他 VPC 内,否则会 lambda 无法正常连接 MSK,引起连接超时。

安全组和子网推荐使用预定义好的,子网至少选择 2 个,保证其高可用。

Kinesis 源配置

Events:
        Kinesis2MSK:
          Type: Kinesis # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
          Properties:
            Stream: arn:aws:kinesis:us-east-1:784675006790:stream/poc-kinesis # Create kinesis stream before run it
            StartingPosition: TRIM_HORIZON
            BatchSize: 10
            MaximumBatchingWindowInSeconds: 10
            Enabled: true
            ParallelizationFactor: 8
            MaximumRetryAttempts: 100
            BisectBatchOnFunctionError: true
            MaximumRecordAgeInSeconds: 604800
            DestinationConfig:
              OnFailure:
                Type: SNS
                Destination: arn:aws:sns:us-east-1:784675006790:email # Create SNS Topic before run it
            TumblingWindowInSeconds: 0
            FunctionResponseTypes:
              - ReportBatchItemFailures

详细参数说明参考文档:将 AWS Lambda 与 Amazon Kinesis 结合使用

其中 DestinationConfigDestination,可以设置成 SQS/SNS。示例中采用 SNS。

Stream 设置为目标 Kinesis 的 ARN

网络

如果 Lambda 和 MSK 不在同一个 VPC 内(包括跨 region 的情况),2 个 VPC 之间需要做 Peering,Lambda 才能访问 MSK。VPC Peering 创建方法参考:创建 VPC 对等连接

安全性

安全组设置

同 region 访问:

  • MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的安全组
  • Lambda 设置出战规则为,允许 9092-9093 端口访问,目标为 MSK 的安全组

跨 region 访问:

  • MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的所在 VPC 的 subnet CDIR。如果 Lambda 部署在多个 subnet 中,需要允许多个 CIDR 访问。
  • Lambda 设置出战规则为,允许 9092-9093 端口访问,目标为 MSK 的所在 VPC 的 subnet CDIR。

安全组配置方法参考:使用安全组控制到资源的流量

MSK 认证设置

推荐安全策略:MSK 集群使用 IAM Role 进行认证,然后对 Lambda 附加相关的 Policy。参考 Amazon MSK 如何与 IAM 协同工作

其他安全策略:如果 MSK 使用其他认证方式,除了用户名密码/pem 证书外,访问控制也受到安全组规则设置限制。

常见问题

问题一

KafkaTimeoutError: Failed to update metadata after 60.0 secs

原因分析

目标 Topic 不存在

可能的解决方案:

  1. 修改 MSK 集群的配置,设置 create.topics.enable=true
  2. 手动创建目标 Kafka 的 Topic

问题二

Lambda 执行超时

原因分析

Kafka 访问失败,失败原因可能是因为网络无法连通;也可能是问题一引起的超时。

可能的解决方案

  1. 检查 Lambda 和 MSK 所在的 VPC,是否为同一 VPC 或者 2 个 VPC 之间是否已经 Peering。VPC Peering 创建方法参考:创建 VPC 对等连接
  2. 检查 Lambda 使用的安全组,确保在出站规则中,允许 9092-9093 端口,目标为 MSK 使用的安全组。
  3. 检查 MSK 使用的安全组,确保在入站规则中,允许 9092-9093 端口,源包含 Lambda 使用的安全组。
  4. 如果 MSK 使用用户名密码方式访问,请确保用户名密码正确。

其他优化

在数据量足够大的情况下,需要调节 BatchSize,加大 ParallelizationFactor。在调整参数后需要测试实际数据传输的时间,并且调整 Lambda 执行的超时时间。

本篇作者

林业

AWS解决方案架构师,负责基于AWS的云计算方案的咨询与架构设计。拥有超过14年研发经验,曾打造千万级用户APP,多项Github开源项目贡献者。在游戏、IoT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。

Dora Gui

AWS技术客户经理,主要支持游戏,互联网行业客户的架构优化、成本管理、技术咨询等工作,并专注在IAAS,大数据和容器等方向的技术选型,方案落地和实践。在加入AWS之前,曾就职于EMC和微软,腾讯等科技公司,拥有近10年虚拟化与公有云领域的架构优化和技术支持经验。