logstash配置
目录:
shipper阶段
中文乱码
可以使用input中的codec=>plain转码
codec => plain {
charset => "GB2312"
}
用于将日志传入kafka
input {
file {
path => [ "/home/ec2-user/logs/nginx/api.chuchujie.com.access.log", "/app/webserver/logs/nginx/api.chuchujie.com.access.log" ]
sincedb_path => "/tmp/.ads_ads_ng-access-log.pos"
start_position => "end"
type => "ads_ads_ng-access-log"
}
}
output{
if [type] == "ads_ads_ng-access-log" {
kafka {
codec => plain {
format => "%{host}%{message}"
}
bootstrap_servers => "10.40.0.39:80"
topic_id => "ads_ads_ng-access-log"
compression_type => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
}
}
}
这里可以配置多个output位置,类型可以重复,例如同时传入多个kafka
index阶段
对数据进行处理
input {
kafka {
consumer_restart_on_error => true
consumer_threads => 2
group_id => "elk"
topic_id => "ads_ads_ng-access-log"
type => "ads-access"
zk_connect => "10.40.0.39:2181"
}
}
filter {
if [type] == "ads-access" {
ruby {
init => "@kname = ['host','elb','ident','auth','timestamp','requestmethod','response','request_times','upstream_time','bytes','referrer','agent','xforwardedfor','request_body']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event['message'].split(''))])
new_event.remove('@timestamp')
event.append(new_event)"
remove_field => [ "message" ]
add_field => { "module" => "-" }
add_field => { "function" => "-" }
}
if [requestmethod] {
ruby{
init=> "@kname = ['verb','request','httpversion']"
code=> "event.append(Hash[@kname.zip(event['requestmethod'].split(' '))])"
}
}
if [request] {
ruby{
init=> "@kname = ['request_path']"
code=> "event.append(Hash[@kname.zip(event['request'].split('?'))])"
}
}
mutate {
replace => ["agent", ""]
replace => ["path", ""]
gsub => ["verb",'"','']
gsub => ["httpversion",'"','']
#convert => ["request_time", "float"]
}
if [verb] == "HEAD" {
drop {}
}
ruby {
code => "event['upstream_time'] = '0.000' if event['upstream_time'] == '-';"
}
mutate {
remove_field => ["requestmethod"]
}
if [verb] == "GET" {
ruby {
code => "event['module'] = event['request'][/(module%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['function'] = event['request'][/(function%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['packageName'] = event['request'][/(packageName%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['platform'] = event['request'][/(platform%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['version'] = event['request'][/(version%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['channel'] = event['request'][/(channel%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['gender'] = event['request'][/(gender%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request'].class == String;"
}
ruby {
code => "event['net'] = event['request'][/(net%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request'].class == String;"
}
}
else {
ruby {
code => "event['module'] = event['request_body'][/(module%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['function'] = event['request_body'][/(function%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['packageName'] = event['request_body'][/(packageName%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['platform'] = event['request_body'][/(platform%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['version'] = event['request_body'][/(version%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['channel'] = event['request_body'][/(channel%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['gender'] = event['request_body'][/(gender%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request_body'].class == String;"
}
ruby {
code => "event['net'] = event['request_body'][/(net%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request_body'].class == String;"
}
}
}
}
output {
if [type] == "ads-access" {
elasticsearch {
action => "index"
flush_size => 100
hosts => ["10.40.112.9"] # qbj3-op-es
idle_flush_time => 1
index => "ads.access.log-%{+YYYY.MM.dd}"
manage_template => true
retry_max_interval => 2
timeout => 2
workers => 20
template => "/home/ec2-user/op/op-logstash/conf/.template/ads-access-v2.json"
template_name => "ads-api-access"
template_overwrite => true
}
}
}
后边根据GET还是POST方法去不同的地方通过正则的方式匹配字段
template
这里的template字段对应上边的template_name字段
{
"template": "ads.access.log",
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0
},
"mappings": {
"ads-api-access": {
"properties": {
"@timestamp": {
"type": "date",
"format": "dateOptionalTime"
},
"@version": {
"index": true,
"type": "text"
},
"agent": {
"index": true,
"type": "text"
},
"auth": {
"index": true,
"type": "text"
},
"bytes": {
"index": true,
"type": "integer"
},
"elb": {
"index": true,
"type": "text"
},
"host": {
"index": true,
"type": "keyword"
},
"httpversion": {
"index": true,
"type": "text"
},
"ident": {
"index": true,
"type": "text"
},
"message": {
"index": true,
"type": "text"
},
"path": {
"index": true,
"type": "text"
},
"referrer": {
"index": true,
"type": "text"
},
"request": {
"index": true,
"type": "text"
},
"request_body": {
"index": true,
"type": "text"
},
"request_times": {
"index": true,
"type": "double"
},
"upstream_time": {
"index": true,
"type": "double"
},
"response": {
"index": true,
"type": "integer"
},
"tags": {
"index": true,
"type": "text"
},
"timestamp": {
"index": true,
"type": "date",
"format": "[dd/MMM/yyyy:HH:mm:ss Z]"
},
"type": {
"index": true,
"type": "text"
},
"verb": {
"index": true,
"type": "text"
},
"module": {
"index": true,
"type": "keyword"
},
"function": {
"index": true,
"type": "keyword"
},
"xforwardedfor": {
"index": true,
"type": "text"
},
"packageName": {
"index": true,
"type": "text"
},
"platform": {
"index": true,
"type": "text"
},
"version": {
"index": true,
"type": "text"
},
"channel": {
"index": true,
"type": "text"
},
"gender": {
"index": true,
"type": "text"
},
"net": {
"index": true,
"type": "text"
},
"request_path": {
"index": true,
"type": "text"
}
}
}
}
}
更多操作
gork进行匹配
filter {
if [type] == "ads_ads_production-log" {
ruby {
init => "@kname = ['host','message_body']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event['message'].split(''))])
new_event.remove('@timestamp')
event.append(new_event)"
remove_field => [ "message" ]
}
grok {
match => ["message_body", "(?<ts>\d*)%{SPACE}%{NOTSPACE}%{SPACE}(?<level>%{NOTSPACE})"]
}
if [level] != "ERROR:" {
drop {}
}
}
}
对于1548317446 --- ERROR: COLD DATA:[].......
的日志可以切分出ts字段1548317446和level字段ERROR:
,不过这边level还可将那个符号通过gsub方式去掉
mutate字段对应
filter {
if [type] == "ads_nli_dns-log" {
mutate{
split => ["message","-=-"]
}
mutate {
add_field => {
"datetime" => "%{[message][0]}"
"unix_datetime" => "%{[message][1]}"
"dnsServer" => "%{[message][2]}"
"dnsResult" => "%{[message][3]}"
"ip" => "%{[message][4]}"
"location" => "%{[message][5]}"
"url" => "%{[message][6]}"
"sentRequestAtMillis" => "%{[message][7]}"
"receivedResponseAtMillis" => "%{[message][8}"
"networkOperators" => "%{[message][9]}"
"networkType" => "%{[message][10]}"
"clientType" => "%{[message][11]}"
"strategyLevel" => "%{[message][12]}"
"host" => "%{[message][13]}"
}
}
mutate{
gsub => ["sentRequestAtMillis", 'NULL', '']
gsub => ["receivedResponseAtMillis", 'NULL', '']
remove_field => ["message"]
}
}
}
exec进行服务监测
input {
exec {
command => "php /home/ec2-user/swoole/master/current/Manager/Heartcheck.php"
interval => 3
}
}
filter {
if [message] =~ /^success.*/ {
drop {}
}else{
mutate {
replace => ["message", "@all %{host} %{message}"]
}
}
}
output {
exec {
command => "/bin/bash /home/ec2-user/swoole/master/current/Manager/server.sh restart force"
}
}
geoip获取IP所在地址相关信息
geoip {
source => "clientip"
database => "/home/ec2-user/op/op-logstash/lib/GeoLiteCity.dat"
fields => ["city_name", "ip" ,"region_name"]
}