logstash配置

时间:Jan. 24, 2019 分类:

目录:

shipper阶段

中文乱码

可以使用input中的codec=>plain转码

codec => plain {
         charset => "GB2312"
}

用于将日志传入kafka

input {
    file {
        path => [ "/home/ec2-user/logs/nginx/api.chuchujie.com.access.log", "/app/webserver/logs/nginx/api.chuchujie.com.access.log" ]
        sincedb_path => "/tmp/.ads_ads_ng-access-log.pos"
        start_position => "end"
        type => "ads_ads_ng-access-log"
    }
}

output{
    if [type] == "ads_ads_ng-access-log" {
        kafka {
            codec => plain {
                format => "%{host}%{message}"
            }
            bootstrap_servers => "10.40.0.39:80"
            topic_id => "ads_ads_ng-access-log"
            compression_type => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
        }
    }
}

这里可以配置多个output位置,类型可以重复,例如同时传入多个kafka

index阶段

对数据进行处理

input {
    kafka {
        consumer_restart_on_error => true
        consumer_threads => 2
        group_id => "elk"
        topic_id => "ads_ads_ng-access-log"
        type => "ads-access"
        zk_connect => "10.40.0.39:2181"
    }
}
filter {
    if [type] == "ads-access" {
        ruby {
            init => "@kname = ['host','elb','ident','auth','timestamp','requestmethod','response','request_times','upstream_time','bytes','referrer','agent','xforwardedfor','request_body']"
            code => "
                new_event = LogStash::Event.new(Hash[@kname.zip(event['message'].split(''))])
                new_event.remove('@timestamp')
                event.append(new_event)"
            remove_field => [ "message" ]
            add_field => { "module" => "-" }
            add_field => { "function" => "-" }

        }
        if [requestmethod] {
            ruby{
                init=> "@kname = ['verb','request','httpversion']"
                code=> "event.append(Hash[@kname.zip(event['requestmethod'].split(' '))])"
            }
        }
        if [request] {
            ruby{
                init=> "@kname = ['request_path']"
                code=> "event.append(Hash[@kname.zip(event['request'].split('?'))])"
            }
        }
        mutate {
            replace => ["agent", ""]
            replace => ["path", ""]
            gsub => ["verb",'"','']
            gsub => ["httpversion",'"','']
            #convert => ["request_time", "float"]
        }
        if [verb] == "HEAD" {
            drop {}
        }
        ruby {
            code => "event['upstream_time'] = '0.000' if event['upstream_time'] == '-';"
        }
        mutate {
            remove_field => ["requestmethod"]
        }

        if [verb] == "GET" {
            ruby {
                code => "event['module'] = event['request'][/(module%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['function'] = event['request'][/(function%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['packageName'] = event['request'][/(packageName%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['platform'] = event['request'][/(platform%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['version'] = event['request'][/(version%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['channel'] = event['request'][/(channel%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['gender'] = event['request'][/(gender%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request'].class == String;"
            }
            ruby {
                code => "event['net'] = event['request'][/(net%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request'].class == String;"
            }
        }
        else {
            ruby {
                code => "event['module'] = event['request_body'][/(module%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['function'] = event['request_body'][/(function%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['packageName'] = event['request_body'][/(packageName%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['platform'] = event['request_body'][/(platform%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['version'] = event['request_body'][/(version%22%3A%22)([0-9a-zA-Z.]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['channel'] = event['request_body'][/(channel%22%3A%22)([0-9a-zA-Z_]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['gender'] = event['request_body'][/(gender%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request_body'].class == String;"
            }
            ruby {
                code => "event['net'] = event['request_body'][/(net%22%3A%22)([0-9a-zA-Z]+)/,2] if event['request_body'].class == String;"
            }
        }
    }
}

output {
    if [type] == "ads-access" {
        elasticsearch {
            action => "index"
            flush_size => 100
            hosts => ["10.40.112.9"] # qbj3-op-es
            idle_flush_time => 1
            index => "ads.access.log-%{+YYYY.MM.dd}"
            manage_template => true
            retry_max_interval => 2
            timeout => 2
            workers => 20
            template => "/home/ec2-user/op/op-logstash/conf/.template/ads-access-v2.json"
            template_name => "ads-api-access"
            template_overwrite => true
        }
    }
}

后边根据GET还是POST方法去不同的地方通过正则的方式匹配字段

template

这里的template字段对应上边的template_name字段

{
  "template": "ads.access.log",
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "ads-api-access": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "dateOptionalTime"
        },
        "@version": {
          "index": true,
          "type": "text"
        },
        "agent": {
          "index": true,
          "type": "text"
        },
        "auth": {
          "index": true,
          "type": "text"
        },
        "bytes": {
          "index": true,
          "type": "integer"
        },
        "elb": {
          "index": true,
          "type": "text"
        },
        "host": {
          "index": true,
          "type": "keyword"
        },
        "httpversion": {
          "index": true,
          "type": "text"
        },
        "ident": {
          "index": true,
          "type": "text"
        },
        "message": {
          "index": true,
          "type": "text"
        },
        "path": {
          "index": true,
          "type": "text"
        },
        "referrer": {
          "index": true,
          "type": "text"
        },
        "request": {
          "index": true,
          "type": "text"
        },
        "request_body": {
          "index": true,
          "type": "text"
        },
        "request_times": {
          "index": true,
          "type": "double"
        },
        "upstream_time": {
          "index": true,
          "type": "double"
        },
        "response": {
          "index": true,
          "type": "integer"
        },
        "tags": {
          "index": true,
          "type": "text"
        },
        "timestamp": {
          "index": true,
          "type": "date",
          "format": "[dd/MMM/yyyy:HH:mm:ss Z]"
        },
        "type": {
          "index": true,
          "type": "text"
        },
        "verb": {
          "index": true,
          "type": "text"
        },
        "module": {
          "index": true,
          "type": "keyword"
        },
        "function": {
          "index": true,
          "type": "keyword"
        },
        "xforwardedfor": {
          "index": true,
          "type": "text"
        },
        "packageName": {
          "index": true,
          "type": "text"
        },
        "platform": {
          "index": true,
          "type": "text"
        },
        "version": {
          "index": true,
          "type": "text"
        },
        "channel": {
          "index": true,
          "type": "text"
        },
        "gender": {
          "index": true,
          "type": "text"
        },
        "net": {
          "index": true,
          "type": "text"
        },
        "request_path": {
          "index": true,
          "type": "text"
        }
      }
    }
  }
}

更多操作

gork进行匹配

filter {
    if [type] == "ads_ads_production-log" {
        ruby {
            init => "@kname = ['host','message_body']"
            code => "
                new_event = LogStash::Event.new(Hash[@kname.zip(event['message'].split(''))])
                new_event.remove('@timestamp')
                event.append(new_event)"
            remove_field => [ "message" ]
        }
        grok {
            match => ["message_body", "(?<ts>\d*)%{SPACE}%{NOTSPACE}%{SPACE}(?<level>%{NOTSPACE})"]
        }
        if [level] != "ERROR:" {
            drop {}
        }
    }
}

对于1548317446 --- ERROR: COLD DATA:[].......的日志可以切分出ts字段1548317446和level字段ERROR:,不过这边level还可将那个符号通过gsub方式去掉

mutate字段对应

filter {
    if [type] == "ads_nli_dns-log" {
        mutate{
            split => ["message","-=-"]
        }
        mutate {
            add_field => {
            "datetime" => "%{[message][0]}"
            "unix_datetime" => "%{[message][1]}"
            "dnsServer" => "%{[message][2]}"
            "dnsResult" => "%{[message][3]}"
            "ip" => "%{[message][4]}"
            "location" => "%{[message][5]}"
            "url" => "%{[message][6]}"
            "sentRequestAtMillis" => "%{[message][7]}"
            "receivedResponseAtMillis" => "%{[message][8}"
            "networkOperators" => "%{[message][9]}"
            "networkType" => "%{[message][10]}"
            "clientType" => "%{[message][11]}"
            "strategyLevel" => "%{[message][12]}"
            "host" => "%{[message][13]}"
            }
        }
        mutate{
            gsub => ["sentRequestAtMillis", 'NULL', '']
            gsub => ["receivedResponseAtMillis", 'NULL', '']
            remove_field => ["message"]
        }
    }
}

exec进行服务监测

input {
    exec {
        command => "php /home/ec2-user/swoole/master/current/Manager/Heartcheck.php"
        interval => 3
    }
}

filter {
    if [message] =~ /^success.*/ {
        drop {}
    }else{
        mutate {
            replace => ["message", "@all %{host} %{message}"]
        }
    }
}

output {

    exec {
        command => "/bin/bash /home/ec2-user/swoole/master/current/Manager/server.sh restart force"
    }
}

geoip获取IP所在地址相关信息

    geoip {
        source => "clientip"
        database => "/home/ec2-user/op/op-logstash/lib/GeoLiteCity.dat"
    fields => ["city_name", "ip" ,"region_name"]
    }