大数据基础之Logstash(6)mongo input
2023-09-14 09:00:07 时间
logstash input插件之mongodb是第三方的,配置如下:
input { mongodb { uri => 'mongodb://mongo_server:27017/db' placeholder_db_dir => '/path/to/db_dir/' placeholder_db_name => 'table.db' collection => 'table' batch_size => 5000 } }
安装
./logstash-plugin install logstash-input-mongodb
插件实现非常简单,就一个ruby文件,
https://github.com/phutchins/logstash-input-mongodb/blob/master/lib/logstash/inputs/mongodb.rb
使用sqlite来维护状态,db文件目录在 placeholder_db_dir,可以直接通过sqlite命令查看和修改
# sqlite3 /path/to/db_dir/table.db
db结构
sqlite> .schema CREATE TABLE `since_table` (`table` varchar(255), `place` Int); sqlite> select * from since_table order by place desc limit 1; logstash_since_table|5d0b2c2682b7d74de069ce4d
插件中取place代码
public def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) since = sqlitedb[SINCE_TABLE] x = since.where(:table => "#{since_table}_#{mongo_collection_name}") if x[:place].nil? || x[:place] == 0 first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}") return first_entry_id else @logger.debug("placeholder already exists, it is #{x[:place]}") return x[:place][:place] end end
place取自mongo的_id
> db.table.find().limit(1).pretty() { "_id" : ObjectId("5b48cd2382b7d752b802de31"), ...
可以手工通过sqlite的update命令来操作进度;
同步过程日志
D, [2019-06-20T16:21:31.938302 #28968] DEBUG -- : MONGODB | 47.92.149.159:27017 | db.find | STARTED | {"find"=>"table", "filter"=>{"_id"=>{"$gt"=>BSON::ObjectId('5d0b420782b7d74de069db7b')}}, "limit"=>10000} D, [2019-06-20T16:21:31.941658 #28968] DEBUG -- : MONGODB | 47.92.149.159:27017 | db.find | SUCCEEDED | 0.002s
读place,从place开始取10000条,然后写place,如此往复
参考:https://github.com/phutchins/logstash-input-mongodb
相关文章
- 数据透视表上线!如何在纯前端实现这个强大的数据分析功能?
- Python基础21-网络编程
- 大数据必学Java基础(五十六):LinkedList实现类的使用
- 数据统计分析的16个基础概念
- Repeater使用方法—基础数据绑定+多级嵌套「建议收藏」
- 大数据学习之Linux基础[通俗易懂]
- Python学习总结之基础语法知识汇总(一)
- 大数据必学Java基础(八十):网络编程的深入了解
- 大数据必学Java基础(八十一):基于TCP的网络编程
- 【原创】Java基础面试题②
- 一、基础折线图详解《手把手教你 ECharts 数据可视化详解》
- salesforce零基础学习(一百二十三)Transaction Security 浅入浅出
- 大数据必学Java基础(一百一十五):Session域监听器
- Python基础(十五):推导式的讲解
- MySQL 之基础命令(精简笔记)
- 打通数据价值链,百分点数据科学基础平台实现数据到决策的价值转换 | 爱分析调研
- 英语基础-倒装句型·部分倒装
- R语言基础笔记-04(字符串、数据框、条件与循环)
- Linux基础:Linux信号机制详解
- MySQL与PHP的基础与应用专题之数据查询语句
- 使用jqMobi开发app基础:Grid布局详解手机开发
- Spark入门实战系列–2.Spark编译与部署(上)–基础环境搭建详解大数据
- java基础之switch语句的深入解析详解编程语言
- 掌握Linux系统基础 从入门到精通(linuxba)
- 命令Linux终端中的基础常用命令(常用 linux)
- 建立Oracle中间库给数据传递奠定基础(oracle中间库建立)
- Redis实战从基础到精通(基础篇一文带你掌握redis)