zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java架构师-分布式(五):分布式消息队列(2)--> Kafka【配合ELK进行海量日志收集(Elasticsearch、Logstash、Kibana)】【与SpringBoot整合】

2023-09-27 14:20:41 时间

在这里插入图片描述
在这里插入图片描述

一、Kafka与springboot整合

1、生产者

2、消费者

二、架构设计

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

三、log4j2日志输出

在这里插入图片描述
pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.frxs</groupId>
	<artifactId>collector</artifactId>
	<version>1.0.0</version>
	<name>collector</name>
	<description>collector</description>
	
  <dependencies>
      	<dependency>
          	<groupId>org.springframework.boot</groupId>
          	<artifactId>spring-boot-starter-web</artifactId>
          	<!-- 	排除spring-boot-starter-logging -->
          	<exclusions>
              	<exclusion>
                  	<groupId>org.springframework.boot</groupId>
                  	<artifactId>spring-boot-starter-logging</artifactId>
              	</exclusion>
          	</exclusions>
      	</dependency>
      	<dependency>
           	<groupId>org.springframework.boot</groupId>
           	<artifactId>spring-boot-starter-test</artifactId>
           	<scope>test</scope>
      	</dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency> 		
		<!-- log4j2 -->
		<dependency>
		    <groupId>org.springframework.boot</groupId>
		    <artifactId>spring-boot-starter-log4j2</artifactId>
		</dependency> 
	  	<dependency>
	   		<groupId>com.lmax</groupId>
	   		<artifactId>disruptor</artifactId>
	   		<version>3.3.4</version>
	  	</dependency>		
		
      	<dependency>
          	<groupId>com.alibaba</groupId>
          	<artifactId>fastjson</artifactId>
          	<version>1.2.58</version>
      	</dependency>	
      	
	</dependencies>	
	
	<build>
		<finalName>collector</finalName>
    	<!-- 打包时包含properties、xml -->
    	<resources>
             <resource>  
                <directory>src/main/java</directory>  
                <includes>  
                    <include>**/*.properties</include>  
                    <include>**/*.xml</include>  
                </includes>  
                <!-- 是否替换资源中的属性-->  
                <filtering>true</filtering>  
            </resource>  
            <resource>  
                <directory>src/main/resources</directory>  
            </resource>      	
    	</resources>		
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<mainClass>com.bfxy.collector.Application</mainClass>
				</configuration>
			</plugin>
		</plugins>
	</build> 		
</project>

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <property name="FILE_NAME">collector</property>
        <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>  
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Filters>
              <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
          </Filters>              
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>            
    </Appenders>
    <Loggers>
        <!-- 业务相关 异步logger -->
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
          <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
          <AppenderRef ref="errorAppender"/>
        </AsyncLogger>       
        <Root level="info">
            <Appender-Ref ref="CONSOLE"/>
            <Appender-Ref ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>         
    </Loggers>
</Configuration>

四、filebeat日志收集

在这里插入图片描述

1、filebeat安装:

cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

配置filebeat,可以参考filebeat.full.yml中的配置。

vim /usr/local/filebeat-5.6.2/filebeat.yml

2、filebeat启动:

检查配置是否正确

cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest

启动filebeat

/usr/local/filebeat-6.6.0/filebeat &
ps -ef | grep filebeat

启动kafka:

/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

查看topic列表:

kafka-topics.sh --zookeeper 192.168.11.111:2181 --list

创建topic

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1  

查看topic情况
kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

五、logstash日志过滤

在这里插入图片描述

# 解压安装
tar -zxvf logstash-6.6.0.tar.gz -C /usr/local/

## conf下配置文件说明:
# logstash配置文件:/config/logstash.yml
# JVM参数文件:/config/jvm.options
#  日志格式配置文件:log4j2.properties
#  制作Linux服务参数:/config/startup.options


vim /usr/local/logstash-6.6.0/config/logstash.yml
## 增加workers工作线程数 可以有效的提升logstash性能
pipeline.workers: 16


## 启动logstash
nohup /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

六、Elasticsearch&kibana存储可视化

1、ES-6.6.0环境搭建


## 三个节点解压elasticsearch-6.6.0.tar.gz
tar -zxvf elasticsearch-6.6.0.tar.gz -C /usr/local/

## 修改配置文件:
vim elasticsearch-6.6.0/config/elasticsearch.yml

### elasticsearch.yml 配置
cluster.name: es_log_cluster
node.name: es-node-1    ## es-node-2  es-node-3 不同节点名称不同
path.data: /usr/local/elasticsearch-6.6.0/data  ## es数据存放位置
path.logs: /usr/local/elasticsearch-6.6.0/logs  ## es日志存放位置
bootstrap.memory_lock: true     ## 锁内存,强制占用(类似oracle的锁内存)保证es启动正常
network.host: 192.168.0.238     ## network.host不同节点IP对应 (对外发布IP)
## 防止脑裂配置
## 当新节点加入的时候,配置一个初始化主机列表用于节点发现.
## 默认的主机列表是 ["127.0.0.1", "[::1]"]
discovery.zen.ping.unicast.hosts: ["192.168.0.238:9300", "192.168.0.239:9300", "192.168.0.240:9300"]
## 最小节点数,为了避免脑裂的发生,使用如下配置(数值为节点总数/2 + 1)
discovery.zen.minimum_master_nodes: 2
# 如果集群发生重启,直到N个节点启动完成,才能开始进行集群初始化恢复动作
gateway.recover_after_nodes: 2
# 集群应该预期有几个节点(master或node都算)
gateway.expected_nodes: 3
### 等待凑齐预期节点时间,例如:先等凑够了3个节点,再等5分钟看看有没有凑齐5个节点
gateway.recover_after_time: 5m
# 禁止在一个操作系统启动多个节点 
node.max_local_storage_nodes: 1
# 删除索引时,需要明确的名称
action.destructive_requires_name: true
# 防止同一个分片的主副本放在同一台机器上
cluster.routing.allocation.same_shard.host: true
####################################################
#此处为6.6.0版本安装的差异点
#通过bin/elasticsearch-certutil ca生成elastic-stack-ca.p12
#将上面生成的文件复制到所有节点对应路径/usr/local/elasticsearch-6.6.0/config/elastic-stack-ca.p12
#使用自带shell修改密码,将所有密码修改为123456
#bin/elasticsearch-setup-passwords interactive
#添加配置
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /usr/local/elasticsearch-6.6.0/config/elastic-stack-ca.p12
xpack.security.transport.ssl.truststore.path: /usr/local/elasticsearch-6.6.0/config/elastic-stack-ca.p12
####################################################
## 生成认证(一台节点生成,发送到其余节点上 秘钥)
elasticsearch-certutil ca
mv elastic-stack-ca.p12 ../config/

## 赋权
chown -R baihezhuo:baihezhuo /usr/local/elasticsearch-6.6.0/
## 测试启动
/usr/local/elasticsearch-6.6.0/bin/elasticsearch

## x-pack破解:
## 覆盖elasticsearch-6.6.0\modules\x-pack\x-pack-core\x-pack-core-6.6.0.jar中的两个类
## 用LicenseVerifier.class 覆盖x-pack-core-6.6.0.jar\org\elasticsearch\license目录下的同名文件
## 用 XPackBuild.class 覆盖 x-pack-core-6.6.0.jar\org\elasticsearch\xpack\core 目录下的同名文件
## 类获取地址https://pan.baidu.com/s/1ESqoZW8eieO7Zdgs31hxsQ,密码:uqnd

## jar包替换
/usr/local/elasticsearch-6.6.0/modules/x-pack-core/x-pack-core-6.6.0.jar

#使用自带shell修改密码,将所有密码修改为123456
/usr/local/elasticsearch-6.6.0/bin/elasticsearch-setup-passwords interactive

##启动集群并测试:
curl -u elastic:123456 192.168.11.35:9200

## 解压kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-linux-x86_64.tar.gz
tar -zxvf kibana-6.6.0-linux-x86_64.tar.gz -C /usr/local/
mv kibana-6.6.0-linux-x86_64/ kibana-6.6.0
## 进入kibana目录,修改配置文件
vim /usr/local/kibana-6.6.0/config/kibana.yml
## 修改配置如下:
server.host: "0.0.0.0"
server.name: "192.168.11.35"
elasticsearch.hosts: ["http://192.168.11.35:9200"]
elasticsearch.username: "elastic"
elasticsearch.password: "123456"
## 启动:
/usr/local/kibana-6.6.0/bin/kibana &
## 指定配置文件启动:
nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-6.6.0/config/kibana.yml > /dev/null 2>&1 &
## 访问:
http://192.168.0.236:5601/app/kibana (5601为kibana默认端口)

##申请license:
https://license.elastic.co/registration

## 修改申请的license, 注意license.json文件名称不能变否则认证失败
1."type":"basic" 替换为 "type":"platinum" # 基础版变更为铂金版
2."expiry_date_in_millis":1561420799999 替换为 "expiry_date_in_millis":3107746200000 # 1年变为50年

## 启动elasticsearch服务 和 kibana服务
## 进入kibana后台,Management->License Management上传修改后的token


##ik分词器:
## 安装elasticsearch-ik分词器
https://github.com/medcl/elasticsearch-analysis-ik
## 下载 https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.6.0/elasticsearch-analysis-ik-6.6.0.zip
mkdir -p /usr/local/elasticsearch-6.6.0/plugins/ik/
## 上传到/usr/local/software下 elasticsearch-analysis-ik-6.6.0.zip
## 进行解压到刚创建的/usr/local/elasticsearch-6.6.0/plugins/ik/目录:
unzip -d /usr/local/elasticsearch-6.6.0/plugins/ik/ elasticsearch-analysis-ik-6.6.0.zip

## 查看是否ok
cd /usr/local/elasticsearch-6.6.0/plugins/ik/
## 重新复权
chown -R baihezhuo:baihezhuo /usr/local/elasticsearch-6.6.0/

## 重新启动ES节点,显示如下信息代表加载ik分词器成功
[es-node01] loaded plugin [analysis-ik]
## 切换用户
su baihezhuo

## 运行es服务
/usr/local/elasticsearch-6.6.0/bin/elasticsearch -d

## 健康情况
- 查看某个节点情况:http://192.168.11.35:9200/
- 查看集群节点情况:http://192.168.11.35:9200/_cluster/health?pretty

## 正常关闭命令:
kill -sigterm pid

## head启动命令:
cd /usr/local/elasticsearch-head/ && /usr/local/elasticsearch-head/node_modules/grunt/bin/grunt server &

## head关闭命令:先找到head占用端口所在的进程,然后kill即可
netstat -tunpl | grep 9100

## head访问:http://192.168.11.35:9100?auth_user=elastic&auth_password=123456

## kibana启动:
/usr/local/kibana-6.6.0/bin/kibana &
## kibana指定配置文件启动:
nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-5.6.2/config/kibana.yml > /dev/null 2>&1 &

## kibana访问:
http://192.168.11.35:5601/app/kibana (5601为kibana默认端口)

## kibana关闭命令:先找到head占用端口所在的进程,然后kill即可
netstat -tunpl | grep 5601

2、watcher监控告警

## 创建一个watcher,比如定义一个trigger 每个10s钟看一下input里的数据
## 创建一个watcher,比如定义一个trigger 每个5s钟看一下input里的数据
PUT _xpack/watcher/watch/error_log_collector_watcher
{
  "trigger": {
    "schedule": {
      "interval": "5s"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": ["<error_log_collector-{now+8h/d}>"],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              } 
            }
          }
        }
      }
    }
  },

  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
 
  "transform": {
    "search": {
      "request": {
        "indices": ["<error-log-collector-{now+8h/d}>"],
        "body": {
          "size": 1,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              } 
            }
          },
          "sort": [
            {
                "currentDateTime": {
                    "order": "desc"
                }
            }
          ]
        }
      }
    }
  },
  "actions": {
    "test_error": {
      "webhook" : {
        "method" : "POST",
        "url" : "http://192.168.11.31:8001/accurateWatch",
        "body" : "{\"title\": \"异常错误告警\", \"applicationName\": \"{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}\", \"level\":\"告警级别P1\", \"body\": \"{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}\", \"executionTime\": \"{{#ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}\"}"
      }
    }
 }
}

# 查看一个watcher
# 
GET _xpack/watcher/watch/error_log_collector_watcher


#删除一个watcher
DELETE _xpack/watcher/watch/error_log_collector_watcher

#执行watcher
# POST _xpack/watcher/watch/error_log_collector_watcher/_execute

#查看执行结果
GET /.watcher-history*/_search?pretty
{
  "sort" : [
    { "result.execution_time" : "desc" }
  ],
  "query": {
    "match": {
      "watch_id": "error_log_collector_watcher"
    }
  }
}

GET error-log-collector-2019.09.18/_search?size=10
{

  "query": {
    "match": {
      "level": "ERROR"
    }
  }
  ,
  "sort": [
    {
        "currentDateTime": {
            "order": "desc"
        }
    }
  ] 
}


GET error-log-collector-2019.09.18/_search?size=10
{

  "query": {
    "match": {
      "level": "ERROR"
    }
  }
  ,
  "sort": [
    {
        "currentDateTime": {
            "order": "desc"
        }
    }
  ] 
}