FileBeat+LogStash实现MySQL慢查询日志解析

背景是一个大型营销系统经常出现mysql的慢查询,导致线上服务频繁出现故障,为了查看是哪些sql有问题,并且要支持各种维度的统计查询,所以使用FileBeat+LogStash+ElasticSearch+Kibana实现此需求。本文仅描述如何配置FileBeath和LogStash实现MySQL慢查询日志解析。

FileBeat配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
filebeat.inputs:
- type: log
enabled: true
# 忽略在指定的时间跨度之前被修改的文件
ignore_older: 30000h
# mysql慢查询日志目录,支持*通配符匹配多级目录
paths:
- /opt/slow-sql/*.log
# 文档类型是mysqlslow,这是filebeat内置的一套规则
document_type: mysqlslow
multiline:
pattern: "^# User@Host: "
negate: true
match: after
tail_files: false

output.logstash:
# logstash的地址,我是部署在同一台机器上的
hosts: ["127.0.0.1:5044"]

LogStash配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
input {
# 使用filebeat推送日志
beats {
port => 5044
host => "0.0.0.0"
}
}

filter {
grok {
# 有数据库名,有schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Schema: (?<schema>\w+)([\s\S]*)\s+#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)use\s(?<dbname>\w+);([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
# 无数据库名,有schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Schema: (?<schema>\w+)([\s\S]*)\s+#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
# 有数据库名,无schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)use\s(?<dbname>\w+);([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
# 无数据库名,无schema
match => {"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s@\s*\[%{IP:clientip}\]\s*([\s\S]*)#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*([\s\S]*)SET timestamp=%{NUMBER:sql_time:int}"}
overwrite => ["message"]
}

grok{
# 匹配 source中的ip
match => {"source" => "(?m)\s*%{IP:server_ip}"}
}

# 时间戳格式化并只保留日期
ruby {
code => "
require 'time'
event.set('date_tag', Time.at(event.get('sql_time')).to_date.to_s.delete!('-'))
"
}

# 索引时间戳使用sql生成的时间,不再使用当前时间
date {
match => ["sql_time", "yyyy-MM-dd HH:mm:ss", "UNIX"]
target => "@timestamp"
locale => "cn"
}

}

output {
# 调试时使用,在控制台打印日志分割结果
stdout {
codec => rubydebug {}
}
# es配置
elasticsearch {
hosts => "localhost:9200"
# 索引名称
index => "slow-sql-%{+YYYY.MM.dd}"
}
}

一键安装ELFK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/bin/bash
echo "
#-----------------------------------------
#@Author: linvaux
#@Email: linvaux@outlook.com
#@Desc: Auto install ELFK
#----------------------------------------
"
INFO()
{
echo -e "\033[0;32m[INFO] $* \033[0m"
}

ERROR()
{
echo -e "\033[0;31m[ERROR] $* \033[0m"
}

WARN()
{
echo -e "\033[0;33m[WARN] $* \033[0m"
}

# booster the docker-hub
booster()
{
daemon="/etc/docker/daemon.json"
if [[ -e ${daemon} ]];then
INFO Backup ${daemon} success!
mv ${daemon} ${daemon}.bak
echo "{\"registry-mirrors\" : [\"https://hub-mirror.c.163.com\"]}" > ${daemon}
else
echo "{\"registry-mirrors\" : [\"https://hub-mirror.c.163.com\"]}" > ${daemon}
fi
INFO Config docker-hub booster success!
}

check_env()
{
if [[ -z "$(which docker)" ]]
then
WARN No docker were found,try to install!
INFO Start to install docker
source /etc/os-release
if [[ "$ID" == "ubuntu" ]] || [[ "$ID" == "debain" ]]
then
apt update
apt install curl wget -y
curl -fsSL https://get.docker.com | sh
booster
systemctl daemon-reload
systemctl restart docker
if [[ -z "$(which java)" ]];then
apt install openjdk-8-jdk -y
fi
elif [[ "$ID" == "centos" ]]
then
yum update -y
yum install wget curl net-tools -y
curl -fsSL https://get.docker.com | sh
booster
systemctl daemon-reload
systemctl restart docker
if [[ -z "$(which java)" ]];then
yum install java-1.8.0-openjdk -y
fi
else
ERROR Could not support $ID platform!
exit 1
fi
fi
}

install_elasticsearch()
{
INFO Start to install elasticsearch
echo "vm.max_map_count=655360" >> /etc/sysctl.conf
sysctl -p
docker pull docker.elastic.co/elasticsearch/elasticsearch:6.5.4
docker run -d --restart=always -p 9200:9200 -p 9300:9300 --name es -h es -e cluster.name=kiki -e node.name=node1 -e http.cors.enabled=true -e http.cors.allow-origin="*" -e xpack.security.enabled=false docker.elastic.co/elasticsearch/elasticsearch:6.5.4
}

install_kibana()
{
INFO Start to install kibana
docker pull kibana:6.5.4;
docker run --restart=always -p 5601:5601 --name kibana -e ELASTICSEARCH_URL=http://127.0.0.1:9200 --network=host -d kibana:6.5.4
}

install_filebeat_and_logstash()
{
INFO Start to install filebeat and logstash
source /etc/os-release
if [[ "$ID" == "ubuntu" ]] || [[ "$ID" == "debain" ]]
then
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
apt-get install apt-transport-https -y
echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list
apt-get update && apt install filebeat logstash -y
if [[ $? -ne 0 ]]
then
ERROR Install filebeat and logstash failed!
exit 1
fi
elif [[ "$ID" == "centos" ]]
then
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
echo -e '
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
' > /etc/yum.repos.d/elastic.io.repo
yum makecache && yum install filebeat logstash -y
if [[ $? -ne 0 ]]
then
ERROR Install filebeat and logstash failed!
exit 1
fi
else
ERROR Could not support $ID platform!
exit 1
fi

}

start_filebeat()
{
INFO Start to call filebeat
filebeat_yaml="/etc/filebeat/filebeat.yml"
if [[ -f ${filebeat_yaml} ]];then
mv ${filebeat_yaml} ${filebeat_yaml}.bak
cp ./filebeat.yml ${filebeat_yaml}
systemctl restart filebeat
if [[ $? -ne 0 ]]
then
ERROR Start filebeat failed!
exit 1
fi
else
ERROR ${filebeat_yaml} not found, please check!
exit 1
fi

}

start_logstash()
{
INFO Start to call logstash
logstash_conf_dir="/etc/logstash/conf.d/"
if [[ -e ${logstash_conf_dir} ]];then
cp ./slow_sql_by_query.conf ${logstash_conf_dir}
systemctl restart logstash
if [[ $? -ne 0 ]]
then
ERROR Start logstash failed!
exit 1
fi
else
ERROR ${logstash_conf_dir} not found, please check!
exit 1
fi

}


run()
{
if [[ "root" == $(whoami) ]]
then
INFO Start to run...
check_env
install_elasticsearch
install_kibana
install_filebeat_and_logstash
# download_log
start_logstash
start_filebeat
# check_index
INFO Run success!
else
ERROR Run as root please!
fi
}

run