Java架构师-分布式(五):分布式消息队列(2)--> Kafka【配合ELK进行海量日志收集(Elasticsearch、Logstash、Kibana)】【与SpringBoot整合】
2023-09-27 14:20:41 时间
一、Kafka与springboot整合
1、生产者
2、消费者
二、架构设计
三、log4j2日志输出
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.frxs</groupId>
<artifactId>collector</artifactId>
<version>1.0.0</version>
<name>collector</name>
<description>collector</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- 排除spring-boot-starter-logging -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
<build>
<finalName>collector</finalName>
<!-- 打包时包含properties、xml -->
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<!-- 是否替换资源中的属性-->
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.bfxy.collector.Application</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
<Properties>
<Property name="LOG_HOME">logs</Property>
<property name="FILE_NAME">collector</property>
<property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
</Properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
</Console>
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<!-- 业务相关 异步logger -->
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
</AsyncLogger>
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
</AsyncLogger>
<Root level="info">
<Appender-Ref ref="CONSOLE"/>
<Appender-Ref ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
</Root>
</Loggers>
</Configuration>
四、filebeat日志收集
1、filebeat安装:
cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0
配置filebeat,可以参考filebeat.full.yml中的配置。
vim /usr/local/filebeat-5.6.2/filebeat.yml
2、filebeat启动:
检查配置是否正确
cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest
启动filebeat
/usr/local/filebeat-6.6.0/filebeat &
ps -ef | grep filebeat
启动kafka:
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
查看topic列表:
kafka-topics.sh --zookeeper 192.168.11.111:2181 --list
创建topic
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1
查看topic情况
kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe
五、logstash日志过滤
# 解压安装
tar -zxvf logstash-6.6.0.tar.gz -C /usr/local/
## conf下配置文件说明:
# logstash配置文件:/config/logstash.yml
# JVM参数文件:/config/jvm.options
# 日志格式配置文件:log4j2.properties
# 制作Linux服务参数:/config/startup.options
vim /usr/local/logstash-6.6.0/config/logstash.yml
## 增加workers工作线程数 可以有效的提升logstash性能
pipeline.workers: 16
## 启动logstash
nohup /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &
六、Elasticsearch&kibana存储可视化
1、ES-6.6.0环境搭建
## 三个节点解压elasticsearch-6.6.0.tar.gz
tar -zxvf elasticsearch-6.6.0.tar.gz -C /usr/local/
## 修改配置文件:
vim elasticsearch-6.6.0/config/elasticsearch.yml
### elasticsearch.yml 配置
cluster.name: es_log_cluster
node.name: es-node-1 ## es-node-2 es-node-3 不同节点名称不同
path.data: /usr/local/elasticsearch-6.6.0/data ## es数据存放位置
path.logs: /usr/local/elasticsearch-6.6.0/logs ## es日志存放位置
bootstrap.memory_lock: true ## 锁内存,强制占用(类似oracle的锁内存)保证es启动正常
network.host: 192.168.0.238 ## network.host不同节点IP对应 (对外发布IP)
## 防止脑裂配置
## 当新节点加入的时候,配置一个初始化主机列表用于节点发现.
## 默认的主机列表是 ["127.0.0.1", "[::1]"]
discovery.zen.ping.unicast.hosts: ["192.168.0.238:9300", "192.168.0.239:9300", "192.168.0.240:9300"]
## 最小节点数,为了避免脑裂的发生,使用如下配置(数值为节点总数/2 + 1)
discovery.zen.minimum_master_nodes: 2
# 如果集群发生重启,直到N个节点启动完成,才能开始进行集群初始化恢复动作
gateway.recover_after_nodes: 2
# 集群应该预期有几个节点(master或node都算)
gateway.expected_nodes: 3
### 等待凑齐预期节点时间,例如:先等凑够了3个节点,再等5分钟看看有没有凑齐5个节点
gateway.recover_after_time: 5m
# 禁止在一个操作系统启动多个节点
node.max_local_storage_nodes: 1
# 删除索引时,需要明确的名称
action.destructive_requires_name: true
# 防止同一个分片的主副本放在同一台机器上
cluster.routing.allocation.same_shard.host: true
####################################################
#此处为6.6.0版本安装的差异点
#通过bin/elasticsearch-certutil ca生成elastic-stack-ca.p12
#将上面生成的文件复制到所有节点对应路径/usr/local/elasticsearch-6.6.0/config/elastic-stack-ca.p12
#使用自带shell修改密码,将所有密码修改为123456
#bin/elasticsearch-setup-passwords interactive
#添加配置
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /usr/local/elasticsearch-6.6.0/config/elastic-stack-ca.p12
xpack.security.transport.ssl.truststore.path: /usr/local/elasticsearch-6.6.0/config/elastic-stack-ca.p12
####################################################
## 生成认证(一台节点生成,发送到其余节点上 秘钥)
elasticsearch-certutil ca
mv elastic-stack-ca.p12 ../config/
## 赋权
chown -R baihezhuo:baihezhuo /usr/local/elasticsearch-6.6.0/
## 测试启动
/usr/local/elasticsearch-6.6.0/bin/elasticsearch
## x-pack破解:
## 覆盖elasticsearch-6.6.0\modules\x-pack\x-pack-core\x-pack-core-6.6.0.jar中的两个类
## 用LicenseVerifier.class 覆盖x-pack-core-6.6.0.jar\org\elasticsearch\license目录下的同名文件
## 用 XPackBuild.class 覆盖 x-pack-core-6.6.0.jar\org\elasticsearch\xpack\core 目录下的同名文件
## 类获取地址https://pan.baidu.com/s/1ESqoZW8eieO7Zdgs31hxsQ,密码:uqnd
## jar包替换
/usr/local/elasticsearch-6.6.0/modules/x-pack-core/x-pack-core-6.6.0.jar
#使用自带shell修改密码,将所有密码修改为123456
/usr/local/elasticsearch-6.6.0/bin/elasticsearch-setup-passwords interactive
##启动集群并测试:
curl -u elastic:123456 192.168.11.35:9200
## 解压kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-linux-x86_64.tar.gz
tar -zxvf kibana-6.6.0-linux-x86_64.tar.gz -C /usr/local/
mv kibana-6.6.0-linux-x86_64/ kibana-6.6.0
## 进入kibana目录,修改配置文件
vim /usr/local/kibana-6.6.0/config/kibana.yml
## 修改配置如下:
server.host: "0.0.0.0"
server.name: "192.168.11.35"
elasticsearch.hosts: ["http://192.168.11.35:9200"]
elasticsearch.username: "elastic"
elasticsearch.password: "123456"
## 启动:
/usr/local/kibana-6.6.0/bin/kibana &
## 指定配置文件启动:
nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-6.6.0/config/kibana.yml > /dev/null 2>&1 &
## 访问:
http://192.168.0.236:5601/app/kibana (5601为kibana默认端口)
##申请license:
https://license.elastic.co/registration
## 修改申请的license, 注意license.json文件名称不能变否则认证失败
1."type":"basic" 替换为 "type":"platinum" # 基础版变更为铂金版
2."expiry_date_in_millis":1561420799999 替换为 "expiry_date_in_millis":3107746200000 # 1年变为50年
## 启动elasticsearch服务 和 kibana服务
## 进入kibana后台,Management->License Management上传修改后的token
##ik分词器:
## 安装elasticsearch-ik分词器
https://github.com/medcl/elasticsearch-analysis-ik
## 下载 https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.6.0/elasticsearch-analysis-ik-6.6.0.zip
mkdir -p /usr/local/elasticsearch-6.6.0/plugins/ik/
## 上传到/usr/local/software下 elasticsearch-analysis-ik-6.6.0.zip
## 进行解压到刚创建的/usr/local/elasticsearch-6.6.0/plugins/ik/目录:
unzip -d /usr/local/elasticsearch-6.6.0/plugins/ik/ elasticsearch-analysis-ik-6.6.0.zip
## 查看是否ok
cd /usr/local/elasticsearch-6.6.0/plugins/ik/
## 重新复权
chown -R baihezhuo:baihezhuo /usr/local/elasticsearch-6.6.0/
## 重新启动ES节点,显示如下信息代表加载ik分词器成功
[es-node01] loaded plugin [analysis-ik]
## 切换用户
su baihezhuo
## 运行es服务
/usr/local/elasticsearch-6.6.0/bin/elasticsearch -d
## 健康情况
- 查看某个节点情况:http://192.168.11.35:9200/
- 查看集群节点情况:http://192.168.11.35:9200/_cluster/health?pretty
## 正常关闭命令:
kill -sigterm pid
## head启动命令:
cd /usr/local/elasticsearch-head/ && /usr/local/elasticsearch-head/node_modules/grunt/bin/grunt server &
## head关闭命令:先找到head占用端口所在的进程,然后kill即可
netstat -tunpl | grep 9100
## head访问:http://192.168.11.35:9100?auth_user=elastic&auth_password=123456
## kibana启动:
/usr/local/kibana-6.6.0/bin/kibana &
## kibana指定配置文件启动:
nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-5.6.2/config/kibana.yml > /dev/null 2>&1 &
## kibana访问:
http://192.168.11.35:5601/app/kibana (5601为kibana默认端口)
## kibana关闭命令:先找到head占用端口所在的进程,然后kill即可
netstat -tunpl | grep 5601
2、watcher监控告警
## 创建一个watcher,比如定义一个trigger 每个10s钟看一下input里的数据
## 创建一个watcher,比如定义一个trigger 每个5s钟看一下input里的数据
PUT _xpack/watcher/watch/error_log_collector_watcher
{
"trigger": {
"schedule": {
"interval": "5s"
}
},
"input": {
"search": {
"request": {
"indices": ["<error_log_collector-{now+8h/d}>"],
"body": {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {"level": "ERROR"}
}
],
"filter": {
"range": {
"currentDateTime": {
"gt": "now-30s" , "lt": "now"
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"transform": {
"search": {
"request": {
"indices": ["<error-log-collector-{now+8h/d}>"],
"body": {
"size": 1,
"query": {
"bool": {
"must": [
{
"term": {"level": "ERROR"}
}
],
"filter": {
"range": {
"currentDateTime": {
"gt": "now-30s" , "lt": "now"
}
}
}
}
},
"sort": [
{
"currentDateTime": {
"order": "desc"
}
}
]
}
}
}
},
"actions": {
"test_error": {
"webhook" : {
"method" : "POST",
"url" : "http://192.168.11.31:8001/accurateWatch",
"body" : "{\"title\": \"异常错误告警\", \"applicationName\": \"{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}\", \"level\":\"告警级别P1\", \"body\": \"{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}\", \"executionTime\": \"{{#ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}\"}"
}
}
}
}
# 查看一个watcher
#
GET _xpack/watcher/watch/error_log_collector_watcher
#删除一个watcher
DELETE _xpack/watcher/watch/error_log_collector_watcher
#执行watcher
# POST _xpack/watcher/watch/error_log_collector_watcher/_execute
#查看执行结果
GET /.watcher-history*/_search?pretty
{
"sort" : [
{ "result.execution_time" : "desc" }
],
"query": {
"match": {
"watch_id": "error_log_collector_watcher"
}
}
}
GET error-log-collector-2019.09.18/_search?size=10
{
"query": {
"match": {
"level": "ERROR"
}
}
,
"sort": [
{
"currentDateTime": {
"order": "desc"
}
}
]
}
GET error-log-collector-2019.09.18/_search?size=10
{
"query": {
"match": {
"level": "ERROR"
}
}
,
"sort": [
{
"currentDateTime": {
"order": "desc"
}
}
]
}
相关文章
- java SpringBoot SpringCloud 热部署 热加载 热调试
- 《TensorFlow技术解析与实战》——2.3 基于Java的安装
- RabbitMQ: Java code example & Springboot integration
- FAQ:Springboot项目运行过程中java.lang.NoClassDefFoundError: ch/qos/logback/classic/spi/ThrowableProxy
- Java 四舍五入
- Maven+SpringBoot+Java 搭建restful 接口API框架到docker部署及遇到的问题记录
- SpringBoot出现 java.lang.IllegalArgumentException: Request header is too large 解决方法
- 基于Java(SpringBoot、Mybatis、SpringMvc)+MySQL实现(Web)小二结账系统【100010230】
- 基于Java(Springboot+Gradle+Mybatis+templeaf 框架)+Mysql构建的(Web)校园二手平台系统【100010102】
- 基于Java(SSM+SpringBoot+Thymeleaf)+MySQL 开发的论坛社区网站【100010072】
- MQ报错java.lang.IllegalStateException: Failed to load ApplicationContext
- Java学习-086-Springboot 自定义启动 banner 信息
- Java学习-064-Springboot 解决跨域访问简单配置示例
- Java学习-062-Springboot 采用war包启动设置
- 数据库连接池clearpool(java实现)详解
- JAVA springboot form-data组包请求
- JAVA 安装Springboot后,run as 没有 spring boot app 选项
- JAVA 离线安装配置Springboot
- Java Linux下部署Springboot在任意文件夹或tomcat下并使用nginx代理实现域名访问
- 浅析Java中使用AES对称加密步骤解析、SpringBoot如何实现AES加解密(秘钥、偏移量)、Java AES加解密工具类参考示例
- Java学习---RMI 技术分析[Hessian]
- Java项目----Springboot-Management
- 如何使用Java读写系统属性?
- Java实现将汉字转为拼音
- 写给自己的Java程序员学习路线图
- Java内存模型JMM--高并发编程
- Java SpringBoot 创建第一个应用程序 helloword
- 开源框架 Java
- SpringBoot整合RabbitMQ报错 org.springframework.amqp.AmqpIOException: java.io.IOException
- 【Java 并发编程】一文详解 Java 内置锁 synchronized