多个日志源filebeat配置详解
架构组合是基于filebeat、logstash、ES、kibana;
1、filebeat 配置
filebeat 是基于原先 logstash-forwarder 的源码改造出来的。换句话说:filebeat 就是新版的 logstash-forwarder,也会是 日志收集器的第一选择。
1、input
# 日志类型 - input_type: log # 日志路径 可以写多个 ,支持通配符 paths: - /home/logs/shiqc_log/*access.log #设置字符编码 encoding: utf-8 # 添加字段信息 fields: logsource: nginx_access_log logtype: nginx_dev #文档类型 document_type: shiqc_dev_123_nginxlog #每 10 秒钟扫描一次 scan_frequency: 10s # 实际读取文件时,每次读取 16384 字节 harvester_buffer_size: 16384 # 是否从文件末尾开始读取 tail_files: true # 多行日志合并 multiline.pattern: '^((\d+)\.(\d+)\.(\d+)\.(\d+)\s){2}\[' multiline.negate: true multiline.match: after
2、filter
5.0 版本后,beats 新增了简单的 filter 功能,用来完成事件过滤和字段删减:
如:
filters: - drop_event: regexp: message: "^DBG:" - drop_fields: contains: source: "test" fields: ["message"] - include_fields: fields: ["http.code", "http.host"] equals: http.code: 200 range: gte: cpu.user_p: 0.5 lt: cpu.user_p: 0.8
可用的条件判断还包括:
equals
contains
regexp
range
or
and
not
3、output
目前 beat 可以发送数据给 Elasticsearch、Logstash、File、Kafka、Redis 和 Console 六种目的地址。
这里以logstash为例:
beats 发送到 Elasticsearch 也是走 HTTP 接口。示例配置段如下:
#----------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: ["47.96.10.91:5044"] worker: 2 loadbalance: true # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem" # Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
这里 worker
的含义,是 beat 连到每个 host 的线程数。在 loadbalance
开启的情况下,意味着有 4 个worker 轮训发送数据。
4、output字段
Filebeat 发送的日志,会包含以下字段:
beat.hostname
beat 运行的主机名beat.name
shipper 配置段设置的name
,如果没设置,等于beat.hostname
@timestamp
读取到该行内容的时间type
通过document_type
设定的内容input_type
来自 "log" 还是 "stdin"source
具体的文件名全路径offset
该行日志的起始偏移量message
日志内容fields
添加的其他固定字段都存在这个对象里面
总体配置概览:
filebeat: spool_size: 1024 # 最大可以攒够 1024 条数据一起发送出去 idle_timeout: "5s" # 否则每 5 秒钟也得发送一次 registry_file: ".filebeat" #文件读取位置记录文件,会放在当前工作目录下。所以如果你换一个工作目录执行 filebeat 会导致重复传输! config_dir: "path/to/configs/contains/many/yaml" # 如果配置过长,可以通过目录加载方式拆分配置 prospectors: # 有相同配置参数的可以归类为一个 prospector - input_type: log 除了 "log",还有 "stdin" fields: # 类似 logstash 的 add_fields logsource: YII_INFO logtype: YIIdev paths: # 指明读取文件的位置 - /home/www/temp/logs/mobile.info.log - /home/www/temp/logs/crontab.info.log - /home/www/temp/logs/api.info.log - /home/www/temp/logs/pc.info.log - /home/www/temp/logs/inside.info.*.log include_lines: ["^ERR", "^WARN"] # 只发送包含这些字样的日志 exclude_lines: ["^OK"] # 不发送包含这些字样的日志 document_type: "YII_dev_123_log" # 定义写入 ES 时的 _type 值 ignore_older: "24h" # 超过 24 小时没更新内容的文件不再监听。在 windows 上另外有一个配置叫 force_close_files,只要文件名一变化立刻关闭文件句柄,保证文件可以被删除,缺陷是可能会有日志还没读完 scan_frequency: "10s" # 每 10 秒钟扫描一次目录,更新通配符匹配上的文件列表 tail_files: false # 是否从文件末尾开始读取 harvester_buffer_size: 16384 # 实际读取文件时,每次读取 16384 字节 backoff: "1s" # 每 1 秒检测一次文件是否有新的一行内容需要读取 # 多行日志合并 multiline.pattern: '^((\d+)\.(\d+)\.(\d+)\.(\d+)\s){2}\[' multiline.negate: true multiline.match: after ... 下面可以有多个- input_type: 甚至多个prospectors #----------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: ["47.96.10.91:5044"] # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem" # Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
下面贴出我的logstash配置
后续有时间我会在整理下 另开文章logstash配置说明
### INPUTS input { #kafka { # codec => "json" # topics_pattern => "elk-.*" # bootstrap_servers => "your.kafka.cluster.node.1:9092,your.kafka.cluster.node.2:9092" # auto_offset_reset => "latest" # group_id => "logstash-g1" #} #file { # path =>["/home/wwwlogs/*.log"] # add_field => {"appName" => "test"} # type =>"test" #} beats { port => 5044 } } ### FILTERS filter { if [fields][logtype] == "nginx"{ grok { #获取 Nginx 日志字段 match => { "message" => [ #Nginx access log 格式 '%{IPV4:clientip} - (?:%{USERNAME}|-) \[%{HTTPDATE:[@metadata][timestamp]}\] %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER} "%{NUMBER:response_code}" (?:%{NUMBER:bytes}|-) (?:%{NUMBER:response_time}|-) "(?:%{URI:http_referer}|-)" %{QS:agent}' ] } #减少冗余数据 remove_field => [ "message" ] } #通用 Nginx 访问日志 if [request] { #获取 日志 时间 date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] } mutate { #转换数据类型 convert => [ "response_code" , "integer", "bytes" , "integer", "response_time", "float" ] } #获取 参数 ruby { init => "@kname = ['url_path','url_args']" code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))]) new_event.remove('@timestamp') event.append(new_event) " } if [url_path] =~ /\.js|css|jpeg|jpg|png|gif|ico|swf$/ { drop {} } if [url_args] { kv { prefix => "url_param_" source => "url_args" field_split => "&" #只存储感兴趣参数 #include_keys => [ "uid", "vn" ] remove_field => [ "url_args" ] } }else{ mutate { #减少冗余数据 remove_field => [ "url_args" ] } } #url 解码 urldecode { all_fields => true } }else{ #忽略其他请求 #drop {} } } if [fields][logsource] == "YII_ERROR"{ grok { #获取 YII_ERROR 日志字段 patterns_dir => ["./config/patterns"] match => { "message" => [ "%{IPV4:clientip} %{IPV4:clientip1} \[%{TIMESTAMP_ISO8601:time}\] %{WORD:RID} %{WORD:env} %{USERNAME:route} \"%{URIPATHPARAM}\" %{WORD:log_level} %{WORD:sysinfo} %{dataINFO:dataInfo}%{ROWINFO}" ] } #减少冗余数据 #remove_field => [ "message" ] } } if [fields][logsource] == "YII_ERROR"{ grok { #获取 YII_ERROR 日志字段 patterns_dir => ["./config/patterns"] match => { "message" => [ "%{IPV4:clientip} %{IPV4:clientip1} \[%{TIMESTAMP_ISO8601:time}\] %{WORD:RID} %{WORD:env} %{USERNAME:route} \"%{URIPATHPARAM}\" %{WORD:log_level} %{WORD:sysinfo} %{dataINFO:dataInfo}%{ROWINFO}" ] } #减少冗余数据 #remove_field => [ "message" ] } } } ### OUTPUTS 根据不同日志来源创建不同的索引 output { if [fields][logsource] == "nginx_access_log"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "nginx_access_log-%{+YYYY.MM.dd}" } } if [fields][logsource] == "nginx_error_log"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "nginx_error_log-%{+YYYY.MM.dd}" } } if [fields][logsource] == "YII_INFO"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "yii_info-%{+YYYY.MM.dd}" } } if [fields][logsource] == "YII_ERROR"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "yii_error-%{+YYYY.MM.dd}" } } #stdout { codec => rubydebug } }