Skip to content

Logstash 保存不同类型的日志到各自的 Elasticsearch 索引

Logstash 的配置文件由三部分组成 inputfilteroutput,每个部分都可以包含一个或多个插件设置。

groovy
input {
  ...
}

filter {
  ...
}

output {
  ...
}

多个 file input 设置的示例

groovy
input {
  file {
    path => "/var/log/messages"
    type => "syslog"
  }

  file {
    path => "/var/log/apache/access.log"
    type => "apache"
  }
}

更多插件设置请参考官方文档:Input Plugins, Output Plugins, Filter Plugins

如果是一份日志同时保存到两台 Elasticsearch,只需要在 output 中添加两个 elasticsearch plugin 设置就行了。

groovy
output {
    elasticsearch {
        hosts => ["http://192.168.0.1:9200"]
        index => "logstash-%{+YYYY.MM.dd}"
    }

    elasticsearch {
        hosts => ["http://192.168.0.2:9200"]
        index => "logstash-%{+YYYY.MM.dd}"
    }
}

如果需要根据某些条件保存到不同的索引,则需要写一些判断条件。
Logstash 的配置文件支持如下格式的语法:

groovy
if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

其中 EXPRESSION 支持如下运算符:

  • 比较运算符

    • equality: ==, !=, <, >, <=, >=
    • regexp: =~, !~ (checks a pattern on the right against a string value on the left)
    • inclusion: in, not in
  • 布尔运算符

    • and : 与
    • or : 或
    • nand : 与非(有 false 则 true,全 true 则 false)
    • xor : 异或(只有在两个比较的值不同时其结果才是 true,否则结果为 false)
  • 一元运算符

    • ! : 非

另外,在判断条件中访问日志中的字段需要使用中括号 [fieldname] 的形式。

groovy
filter {
  if [action] == "login" {
    mutate { remove_field => "secret" }
  }
}

如果是在 Logstash 中调用 sprintf format 输出的字段时,需使用 %{fieldname} 的形式,多个字段时可以使用 %{[fieldname1][fieldname2]}

groovy
output {
  statsd {
    increment => "apache.%{[response][status]}"
  }
}

下面是根据需求写的配置文件。
优先将 typeparameter 的日志单独输出到保存参数的索引,然后根据日志的 level 输出到各自的索引。
如果是程序的 Exception 则不会包含 typelevel 字段,判断是否包含 Exception 字符,如果包含则输出到记录错误消息的索引。
最后的 else 则是输出到默认的 info 索引。
每个索引以月为单位,每月创建一个单独的索引,以便将来删除过期的日志信息。
需要注意的是 elasticsearch 插件的 index 字段不支持大写字母,所以这里增加了一个 lowercasefilter,将 typelevel 字段转成了小写。

groovy
input {
    rabbitmq {
        type => "log_type"
        durable => true
        exchange => "logstash"
        exchange_type => "topic"
        key => "service.#"
        host => "192.168.0.1"
        port => 5672
        user => "username"
        password => "password"
        queue => "log_queue"
        auto_delete => false
        tags => ["service"]
    }
}

filter {
    mutate {
        lowercase => [ "type", "level" ]
    }
}

output {
    if [type] and [type] == "parameter" {
        elasticsearch {
            hosts => ["http://192.168.0.1:9200"]
            index => "logstash-parameter-%{+YYYY.MM}"
        }
    } else if [level] and [level] != "" {
        elasticsearch {
            hosts => ["http://192.168.0.1:9200"]
            index => "logstash-%{level}-%{+YYYY.MM}"
        }
    } else if [message] and "Exception" in [message] {
        elasticsearch {
            hosts => ["http://192.168.0.1:9200"]
            index => "logstash-error-%{+YYYY.MM}"
        }
    } else {
        elasticsearch {
            hosts => ["http://192.168.0.1:9200"]
            index => "logstash-info-%{+YYYY.MM}"
        }
    }
}

参考