1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| # 这台logstash的作用是消息kafka中的消息,创建了和topic一样名称的消费者组 [root@nwps ~]# mv /etc/logstash/conf.d/system.conf{,.bak} [root@nwps ~]# vim /etc/logstash/conf.d/nginx.conf input { kafka { type => "nginxlog" topics => ["nginxlog"] bootstrap_servers => ["192.168.1.17:9092"] group_id => "nginxlog" # 在kafka中创建的组名 auto_offset_reset => latest codec => "json" } }
filter { if [type] == "nginxlog" { grok { match => {"message" => "%{COMBINEDAPACHELOG}"} remove_field => "message" } date { match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z"] } # geoip { # source => "clientip" # target => "geoip" # database => "/etc/logstash/conf.d/GeoLite2-City.mmdb" # add_field => ["[geoip][coordinates]", "%{[geoip][longitude]}"] # add_field => ["[geoip][coordinates]", "%{[geoip][latitude]}"] # } mutate { convert => ["[geoip][coordinates]", "float"] } useragent { source => "agent" target => "userAgent" } } } output { if [type] == 'nginxlog' { elasticsearch { hosts => ["http://192.168.1.19:9200"] index => "logstash-nginxlog-%{+YYYY.MM.dd}" } stdout {codec => rubydebug} } } [root@nwps ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx.conf # 在前台启动,查看效果。如果正常的话,应该可以看到被消费的信息在前台打印出来。类似下面这样: { "@timestamp" => 2020-01-27T16:41:36.459Z, "message" => "{\"@timestamp\":\"2020-01-28T00:41:36+08:00\",\"host\":\"192.168.1.14\",\"clientip\":\"192.168.1.109\",\"size\":\"185\",\"responsetime\":\"0.000\",\"upstreamtime\":\"-\",\"upstreamhost\":\"-\",\"http_host\":\"192.168.1.14\",\"url\":\"/wordpress\",\"domain\":\"192.168.1.14\",\"xff\":\"-\",\"referer\":\"-\",\"status\":\"301\"}", "source" => "/var/log/nginx/host.access.log", "offset" => 2514148, "fields" => { "log_topics" => "nginxlog", "json" => { "overwrite_keys" => true, "keys_under_root" => true } }, "host" => { "name" => "nwps" }, "@version" => "1", "tags" => [ [0] "_grokparsefailure" ], "type" => "nginxlog", "beat" => { "name" => "nwps", "hostname" => "nwps", "version" => "6.5.4" } }
--------------- kafka1 --------------- # 到第一台kafka查看效果 [root@kafka1 bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.17:9092 --group nginxlog --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID nginxlog 0 5407 5407 0 logstash-0-719c3537-8c50-472d-aa3b-ab0e6f3d8498 /192.168.1.14 logstash-0 nginxlog 1 5406 5406 0 logstash-0-719c3537-8c50-472d-aa3b-ab0e6f3d8498 /192.168.1.14 logstash-0 nginxlog 2 5405 5405 0 logstash-0-719c3537-8c50-472d-aa3b-ab0e6f3d8498 /192.168.1.14 logstash-0 # 查看kafka的消息堆积情况,可以加上watch命令。不知是否与上面提到的hosts文件是否有关,最好加上可以解析 # kafka主机名。这里的kafka的组名是在logstash的配置文件中创建的。
|