zl程序教程

您现在的位置是:首页 >  工具

当前栏目

logstash同步数据到es

ES同步数据 Logstash
2023-06-13 09:13:57 时间

logstash 的conf目录下mysql.conf

input {
  stdin {
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.217.130:3307/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
  # the user we wish to excute our statement as
  jdbc_user => "root"
  jdbc_password => "root"
  # the path to our downloaded jdbc driver  
  jdbc_driver_library => "D:/Repository/mysql/mysql-connector-java/8.0.13/mysql-connector-java-8.0.13.jar"
  # the name of the driver class for mysql
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  #要执行的sql文件
  #statement_filepath => "/conf/course.sql"
  statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
  #定时配置
  schedule => "* * * * *"
  record_last_run => true
  last_run_metadata_path => "E:/temp/logstash-6.2.1/config/xc_course_media_metadata"
  }
}


output {
  elasticsearch {
  #ES的ip地址和端口
  hosts => "192.168.217.130:9200"
  #hosts => ["localhost:9200","localhost:9202","localhost:9203"]
  #ES索引库名称
  index => "xc_course"
  document_id => "%{id}"
  document_type => "doc"
  template =>"E:/temp/logstash-6.2.1/config/xc_course_template.json"
  template_name =>"xc_course"
  template_overwrite =>"true"
  }
  stdout {
 #日志输出
  codec => json_lines
  }
}

说明:

1、ES采用UTC时区问题

ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时

where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)

2、logstash每个执行完成会在logstash-6.2.1/config/logstash_metadata记录执行时间下次以此时间为基准进行增量同步数据到索引库。

xc_course_template.json

{
   "mappings" : {
      "doc" : {
         "properties" : {
            "charge" : {
               "type" : "keyword"
            },
            "description" : {
               "analyzer" : "ik_max_word",
               "search_analyzer" : "ik_smart",
               "type" : "text"
            },
            "end_time" : {
               "format" : "yyyy-MM-dd HH:mm:ss",
               "type" : "date"
            },
            "expires" : {
               "format" : "yyyy-MM-dd HH:mm:ss",
               "type" : "date"
            },
            "grade" : {
               "type" : "keyword"
            },
            "id" : {
               "type" : "keyword"
            },
            "mt" : {
               "type" : "keyword"
            },
            "name" : {
               "analyzer" : "ik_max_word",
               "search_analyzer" : "ik_smart",
               "type" : "text"
            },
            "pic" : {
               "index" : false,
               "type" : "keyword"
            },
            "price" : {
               "type" : "float"
            },
            "price_old" : {
               "type" : "float"
            },
            "pub_time" : {
               "format" : "yyyy-MM-dd HH:mm:ss",
               "type" : "date"
            },
            "qq" : {
               "index" : false,
               "type" : "keyword"
            },
            "st" : {
               "type" : "keyword"
            },
            "start_time" : {
               "format" : "yyyy-MM-dd HH:mm:ss",
               "type" : "date"
            },
            "status" : {
               "type" : "keyword"
            },
            "studymodel" : {
               "type" : "keyword"
            },
            "teachmode" : {
               "type" : "keyword"
            },
            "teachplan" : {
               "analyzer" : "ik_max_word",
               "search_analyzer" : "ik_smart",
               "type" : "text"
            },
            "users" : {
               "index" : false,
               "type" : "text"
            },
            "valid" : {
               "type" : "keyword"
            }
         }
      }
   },
   "template" : "xc_course"
}

指定配置文件 启动logstash.bat:

.\logstash.bat -f ..\config\mysql.conf