logstash中将kafka数据直接存储到es中
2023-09-14 09:12:21 时间
下载
建议到官网下载最新版
https://www.elastic.co/cn/downloads/logstash
本文使用logstash7.0.0
https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz
tar -xzvf logstash-7.0.0.tar.gz
mv logstash-7.0.0.tar.gz /usr/local/logstash
如图,Logstash Pipeline中input和output为必须元素,filter为可选元素,input插件使用来自源的数据,filter插件在您指定时修改数据output插件将数据写入目标。
我们可以通过运行最简单Logstash Pipeline来进行测试,步骤如下:
进入Logstash的解压目录,通过-e参数运行一个管道
cd logstash-7.0.0
./bin/logstash -e 'input { stdin { } } output { stdout {} }'
1
2
-e可以直接从命令行指定配置。通过在命令行指定配置,可以快速测试配置,而无需在迭代之间编辑文件。示例中的管道从标准输入stdin获取输入数据,并以结构化格式将输入数据移动到标准输出stdout 。
input { kafka { bootstrap_servers => ["192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092"] group_id => "elastic_group" topics => ["elastic_test"] consumer_threads => 12 decorate_events => true } } output { elasticsearch { hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"] index => "elastic_test" password => "XXX" user => "elastic" } }
logstash版本为5.5.3,kafka版本为2.11,此版本默认内置了kafka插件,可直接配置使用,不需要重新安装插件;注意logstash5.x版本前后配置不太一样,注意甄别,必要时可去elasticsearch官网查看最新版配置参数的变化,例如logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址。 input{ kafka{ bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"] client_id => "test" group_id => "test" auto_offset_reset => "latest" //从最新的偏移量开始消费 consumer_threads => 5 decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中 topics => ["logq","loge"] //数组类型,可配置多个topic type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用 } } 使用了decorate_events属性,注意看logstash控制台打印的信息,会输出如下 "kafka":{"consumer_group":"test","partition":0,"offset":10430232,"topic":"logq","key":null} 另外一个input里面可设置多个kafka, input{ kafka{ bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"] client_id => "test1" group_id => "test1" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["loge"] type => "classroom" } kafka{ bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"] client_id => "test2" group_id => "test2" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["logq"] type => "student" } } 假如你在filter模块中还要做其他过滤操作,并且针对input里面的每个数据源做得操作不一样,那你就可以根据各自定义的type来匹配 filter{ if[type] == "classroom"{ grok{ ........ } } if[type] == "student"{ mutate{ ........ } } } 不只filter中可以这样,output里面也可以这样;并且当output为elasticsearch的时候,input里面的定义的type将会成为elasticsearch的你定义的index下的type output { if[type] == "classroom"{ elasticsearch{ hosts => ["192.168.110.31:9200"] index => "school" timeout => 300 user => "elastic" password => "changeme" } } if[type] == "student"{ ........ } } 对于第一个存储到elasticsearch的路径为localhost:9200/school/classroom;第二个存储到elasticsearch的路径为localhost:9200/school/student。假如从来没有定义过type,默认的type为logs,访问路径为第一个存储到elasticsearch的路径为localhost:9200/school/logs,默认的type也可不加。
读取文件直接发送到es
- 修改/usr/local/logstash/config/logstash-sample.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
#beats {
# port => 5044
#}
file {
path => "/var/log/httpd/access_log"
start_position => beginning
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][logstash]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
- 检查配置文件是否正确:(假设当前目录为/usr/local/logstash/config/)
../bin/logstash -t -f logstash-sample.conf
启动:
../bin/logstash -f logstash-sample.conf
加载本文件夹所有配置文件启动:
../bin/logstash -f ./
或后台启动:
nohup ../bin/logstash -f config/ &
- 常用命令参数
-f:通过这个命令可以指定Logstash的配置文件,根据配置文件配置logstash
-e:后面跟着字符串,该字符串可以被当做logstash的配置(如果是“” 则默认使用stdin作为输入,stdout作为输出)
-l:日志输出的地址(默认就是stdout直接在控制台中输出)
-t:测试配置文件是否正确,然后退出。
相关文章
- es 复合数据类型——数组,对象和嵌套
- php 使用 ElasticSearch/es 的最佳实践
- Kafka压测— 搞垮kafka的方法(转)
- 自己玩KAFKA 版本 kafka_2.13-3.2.1
- TextureView+SurfaceTexture+OpenGL ES来播放视频(二)
- Android OpenGL ES(六)创建实例应用OpenGLDemos程序框架 .
- 有关于OpenGL、OpenGL ES、WebGL的小结
- kafka详解一、Kafka简介
- QuantitativeFinance:量化金融之金融时间序列分析之ES/ETS/GARCH模型的简介、Box-Jenkins方法-AR/MA/ARMA/ARIMA模型的简介及其建模四大步骤之详细攻略
- kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
- es定期删除数据
- ELK-filebeat收集日志到Kafka,并转存ES
- es上的的Watcher示例
- 图数据库HugeGraph——这个无非是利用cassandra+ES作为后端来做的图数据库,支持分布式而已,要说性能,肯定是没有原生的neo4j强的
- ES _source字段介绍——json文档,去掉的话无法更新部分文档,最重要的是无法reindex
- ES doc_values介绍——本质是field value的列存储,做聚合分析用,ES默认开启,会占用存储空间(列存储压缩技巧,除公共除数或者同时减去最小数,字符串压缩的话,直接去重后用数字ID压缩)
- ES bulk源码分析——ES 5.0
- ES(ElasticSearch) 索引创建
- 行人重识别0-08:DG-Net(ReID)-代码无死角解读(4)-网络Es编码解码过程
- 好玩的ES--第四篇之聚合查询和集群
- 解开Kafka神秘的面纱(四):kafka stream及interceptor
- Android12之OpenSL ES中objectIDtoClass分析拆解(十三)