一. Logstash 配置基础
Logstash 的配置文件由三个主要部分组成:input、filter 和 output。input 部分定义了日志的来源,filter 部分用于解析和处理日志数据,而 output 部分则决定了数据的去向。
一个简单的 Logstash 配置文件示例如下:
input {
file {
path => "/var/log/syslog"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "syslog-%{+YYYY.MM.dd}"
}
}
在这个示例中,Logstash 从 /var/log/syslog 文件读取日志,使用 grok 过滤器解析 syslog 格式的日志数据,并将结果输出到 Elasticsearch。
二. Filters:复杂日志处理的核心
1. Grok:提取结构化数据
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
在这个示例中,grok 使用内置的 COMBINEDAPACHELOG 模式来解析 Apache 的访问日志。Logstash 提供了大量预定义的 grok 模式,能够处理常见的日志格式。
2. Mutate:数据转换与增强
filter {
mutate {
rename => { "clientip" => "client_ip" }
convert => { "response_bytes" => "integer" }
remove_field => [ "unnecessary_field" ]
}
}
3. Date:时间戳处理
filter {
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
}
4. Ruby:自定义逻辑
filter {
ruby {
code => "event.set('combined_field', event.get('field1') + ' ' + event.get('field2'))"
}
}
filter {
aggregate {
task_id => "%{some_id}"
code => "map['count'] ||= 0; map['count'] += 1"
push_map_as_event_on_timeout => true
timeout => 120
}
}
三. 条件语句与多事件处理
filter {
if [source] == "apache_logs" {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
} else if [source] == "syslog" {
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}
}
还没有评论,来说两句吧...