zl程序教程

您现在的位置是:首页 >  后端

当前栏目

通过Fluentd实时上传数据到DataHub实践

实时上传数据 实践 通过
2023-09-11 14:17:38 时间
本文把我通过Flunetd,把数据上传到DataHub的配置过程记录下来,希望对大家在配置中能有帮助。

本文把我通过Fluentd,把数据上传到DataHub的配置过程记录下来,希望对大家在配置中能有帮助。

本文使用一台CentOS 6.8的ECS来做测试的,机器上已经有yum。用户测试的时候,需要有一台能连接上公网的Linux机器。


安装依赖包

`yum -y install gcc gcc-c++ openssl* readline* ncurses* zlib* libxml* libjpeg* libpng* libxslt* libtool* `
下载并安装包
wget http://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/fluentd-with-datahub-0.12.23.tar.gz file=fluentd-with-datahub-0.12.23.tar.gz

tar -xzvf fluentd-with-datahub-0.12.23.tar.gz

cd fluentd-with-datahub

这个wget的下载地址,是从文档里拿到的最新的安装包。以后版本可能会更新,用户也可以直接从文档上拿最新的安装包。

安装

目前安装包里只包含了ruby和插件的安装,没有提供fluentd的安装部分的脚本,这里可以做修改

vi install.sh

##修改里面的gem install --local fluent-plugin-datahub-0.0.1.gem,改成

gem install --local string-scrub-0.0.5.gem

gem install --local thread_safe-0.3.5.gem

gem install --local tzinfo-1.2.2.gem

gem install --local tzinfo-data-1.2016.4.gem

gem install --local sigdump-0.2.4.gem

gem install --local http_parser.rb-0.6.0.gem

gem install --local cool.io-1.2.3.gem 

gem install --local yajl-ruby-1.2.1.gem

gem install --local msgpack-0.5.12.gem

gem install --local fluentd-0.12.23.gem

gem install --local fluent-plugin-datahub-0.0.2.gem

##注意以上版本号都是一键安装包里已经有的,以后如果版本更新,这里的版本号也从dependency_gem里看看当时提供的版本是多少

##wq保存退出

sudo sh install.sh

最终结果是
b4

DataHub配置

在DataHub的控制台里创建一个Project,然后创建一个Topic(Topic对应到数据库是一个表的概念)
b1
因为只是测试,我这里就3个字段。

我们在ECS上,用touch /tmp/test.log生成一个文件。我们通过fluentd监控这个文件的变化,并把变化的内容解析后传到datahub里。

配置Fluentd

这里,我们的datahub配置文件fluentd.conf的内容为

 source 

 @type tail

 path /tmp/test.log

 tag test

 format /(? request_time \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s\[(? level .+)\]\s(? content .+)/

 /source 

 match test 

 @type datahub

 access_id your_access_id

 access_key your_access_key

 endpoint http://dh-cn-hangzhou.aliyuncs.com

 project_name your_project_name

 topic_name fluentd

 column_names ["request_time", "level", "content"]

 /match 

这里我把我的project_name和access id/key信息给隐藏了。endpoint 的地址看这里,我纯测试,就用了公网地址。
配置文件生成好了后,使用bin/fluentd -c fluentd.conf启动fluentd

我们通过date "+%Y-%m-%d %H:%M:%S [Info] login success" /tmp/test.log往/tmp/test.log里写日志,然后在fluentd 上,我们可以看到日志为:
b2
来到datahub上可以看到新写进去的日志:
b3
至此大功告成,/tmp/test.log上的新增数据会被实时推送到DataHub里了。


日志数据投递到MaxCompute最佳实践 日志服务采集到日志后,有时需要将日志投递至MaxCompute的表中进行存储与分析。本文主要向用户介绍将数据投递到MaxCompute完整流程,方便用户快速实现数据投递至MaxCompute。
Kafka数据入湖OSS实践 本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。我们知道,阿里云OSS提供了灵活、海量、高性价比的
阿里云Dataworks离线数据同步写入Kafka 之前介绍过Dataworks实时数据同步(Kafka - maxcompute),这里介绍如何使用离线同步的方法,将maxcompute数据同步到阿里云Kafka Topic。
SLS 数据加工 vs 自建 Logstash 阿里云 SLS 是云上一站式大数据处理、分析平台。SLS 数据加工功能则是一个可托管、高可用、可扩展的数据处理服务,广泛适用于数据的规整、富化、分发、汇总、重建索引等场景。
基于LogStash插件采集数据到阿里云Datahub DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点,原生支持对接阿里云的多项服务,相关功能特点与Kafka类似。本身主要介绍如何使用LogStash采集数据写入Datahub。
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。