5月27日 18:22

Logstash 有哪些常用过滤器?Grok 和 Mutate 怎么用?

Grok 过滤器:把非结构化日志变成结构化数据

Grok 是 Logstash 中使用频率最高的过滤器,核心能力是将一行纯文本日志拆解成有名字段的 JSON。它的底层原理是基于正则表达式的模式匹配,但 Elastic 已经预置了大量常用模式,日常使用不需要手写正则。

基本匹配

最典型的场景是解析 Apache/Nginx 访问日志。COMBINEDAPACHELOG 是内置模式,一条配置就能提取 clientip、response_code、bytes 等十几个字段:

conf
filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } }

匹配成功后,原来 message 字段中的一整行日志会被拆成 clientipidentauthtimestampverbrequesthttpversionresponsebytes 等独立字段,后续过滤器和输出插件都能直接引用。

多模式备选

实际生产中,日志格式往往不止一种。Grok 支持传入一个模式数组,按顺序依次尝试匹配,命中的第一个生效:

conf
filter { grok { match => { "message" => [ "%{COMBINEDAPACHELOG}", "%{COMMONAPACHELOG}", "%{SYSLOGBASE} %{GREEDYDATA:message}" ] } } }

这种方式比写一个超长的"万能正则"更易维护,哪条模式匹配了也更容易排查。

自定义模式

当内置模式无法满足需求时,可以在外部文件中定义自己的模式。模式文件的语法是 PATTERN_NAME regex,一行一个:

conf
filter { grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MYAPP_LOG:myapp_field}" } } }

对应的 /etc/logstash/patterns/myapp 文件内容示例:

shell
MYAPP_LOG \[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}

Grok 匹配失败怎么办

Grok 匹配失败时,Logstash 会自动给事件打上 _grokparsefailure 标签。在生产环境中,应该用条件判断捕获这些失败事件,避免脏数据静默进入 Elasticsearch:

conf
filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } if "_grokparsefailure" in [tags] { mutate { add_field => { "parse_error" => "grok failed for message" } } } }

也可以借助 Kibana 自带的 Grok Debugger 工具调试模式,避免反复重启 Logstash。

Mutate 过滤器:字段级别的加工工具

Mutate 是"万能修理工",几乎所有的字段增删改操作都能靠它完成。它不关心数据来源,只对已有字段做变换。

重命名字段

从 Beats 或其他输入源拿到的字段名不符合规范时,用 rename 统一命名:

conf
filter { mutate { rename => { "client_ip" => "source_ip" } } }

类型转换

Grok 解析出来的字段默认都是字符串类型,想做数值聚合或范围查询,必须先转换类型:

conf
filter { mutate { convert => { "response" => "integer" "request_time" => "float" } } }

这一步经常被忽略,导致 Elasticsearch 中所有字段都是 keyword 类型,数值范围查询直接失效。

删除无用字段

每个事件默认携带 message、@version、host 等字段。如果已经用 Grok 把 message 拆成了独立字段,原始 message 留着只会浪费存储:

conf
filter { mutate { remove_field => ["message", "@version", "host"] } }

替换和追加字段

replace 会覆盖已有字段或新建字段,add_field 则是在原有字段基础上追加:

conf
filter { mutate { replace => { "log_source" => "production-nginx" } add_field => { "environment" => "prod" "pipeline" => "nginx-access" } } }

gsub:正则替换字段内容

Mutate 自带 gsub 操作,可以对字段值做正则替换,不需要动用 Ruby 过滤器:

conf
filter { mutate { gsub => [ "request", "\\?.+$", "" ] } }

这会把 /api/users?page=1&size=10 替换为 /api/users,去掉查询参数部分,便于按路径做聚合统计。

大小写转换与分割

conf
filter { mutate { uppercase => ["log_level"] split => { "tags" => "," } } }

uppercase 将字段值转为大写,split 按指定分隔符将字符串拆成数组。这两个操作在数据规范化场景中很常用。

Mutate 各操作的执行顺序

Mutate 内部有固定的操作执行顺序,与你在配置中写的顺序无关:renamecopygsubuppercase/lowercasestripreplacejoinsplitmergecoerceconvertadd_fieldremove_field。如果 rename 在 convert 之后才生效,可能让类型转换的目标字段名对不上。遇到这类问题时,可以拆成两个 mutate 块来控制顺序:

conf
filter { mutate { rename => { "resp_code" => "response" } } mutate { convert => { "response" => "integer" } } }

Date 过滤器:统一时间戳格式

Logstash 用 @timestamp 作为事件的时间基准,但原始日志中的时间格式千差万别。Date 过滤器的作用就是把各种格式的时间字符串解析成 Logstash 内部的 ISO8601 时间对象。

解析多种时间格式

conf
filter { date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "yyyy-MM-dd HH:mm:ss", "ISO8601" ] } }

match 的第二个参数是格式数组,Joda-Time 格式和 ISO8601 关键字都可以混用。解析成功后,@timestamp 自动更新。

指定时区和目标字段

conf
filter { date { match => ["log_time", "yyyy-MM-dd HH:mm:ss"] timezone => "Asia/Shanghai" target => "parsed_time" } }

如果不指定 target,默认写入 @timestamp。如果只想保留解析结果但不动 @timestamp,就指定一个自定义的 target 字段。

GeoIP 过滤器:IP 地址转地理位置

GeoIP 根据 IP 地址查询 MaxMind 数据库,自动补充城市、国家、经纬度等地理信息,是做访问地图可视化的前提。

conf
filter { geoip { source => "client_ip" target => "geo" fields => ["city_name", "country_name", "location"] } }

source 指定待查询的 IP 字段,fields 限制只输出需要的地理字段,避免写入过多无用数据。注意 Logstash 默认内置了 GeoLite2 数据库,但如果需要更精确的数据,需要手动下载并指定 database 路径。

JSON 过滤器:解析嵌套 JSON 日志

现代应用的日志越来越倾向于直接输出 JSON 格式,JSON 过滤器可以把它展开成 Logstash 的事件字段:

conf
filter { json { source => "message" target => "parsed" } }

如果指定了 target,解析结果会放在 target 字段下形成嵌套结构;不指定则直接铺平到顶层。生产中建议指定 target,避免字段名冲突。解析后通常配合 mutate 删除原始 message 字段:

conf
filter { json { source => "message" target => "log" } mutate { remove_field => ["message"] } }

Useragent 过滤器:解析浏览器信息

从 HTTP 请求的 User-Agent 头中提取浏览器名称、版本、操作系统等信息:

conf
filter { useragent { source => "agent" target => "ua" } }

通常跟在 Grok 解析 Apache 日志之后使用,agent 字段就是 Grok 从日志中提取出来的 User-Agent 字符串。

CSV 过滤器:处理表格数据

CSV 过滤器用于解析逗号(或其他分隔符)分隔的文本数据:

conf
filter { csv { separator => "," columns => ["name", "age", "city"] autodetect_column_types => true } }

columns 指定每列的字段名,autodetect_column_types 让 Logstash 自动识别数值类型。如果 CSV 首行是表头,也可以省略 columns 让它自动读取。

Ruby 过滤器:处理复杂逻辑

当内置过滤器无法满足需求时,Ruby 过滤器提供了完全的编程能力:

conf
filter { ruby { code => ' status = event.get("response").to_i if status >= 400 event.tag("error") event.set("error_level", status >= 500 ? "server_error" : "client_error") end ' } }

Ruby 过滤器灵活但性能开销大,Grok 能搞定的事情不要用 Ruby。实际项目中,Ruby 过滤器多用于多字段联合计算、条件标签打标等场景。

Drop 过滤器:丢弃不需要的事件

Drop 过滤器直接丢弃整个事件,不会传到输出阶段。常见用法是过滤掉调试日志或特定来源的噪声数据:

conf
filter { if [log_level] == "DEBUG" or [message] =~ /^health check/ { drop { } } }

使用时注意加上条件判断,否则会丢掉所有事件。

过滤器的组合与顺序

实际项目中,过滤器总是组合使用的。一个典型的 Nginx 访问日志处理管线:

conf
filter { # 第一步:用 Grok 把日志拆成字段 grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } # 第二步:转换数值类型 mutate { convert => { "response" => "integer" } convert => { "bytes" => "integer" } remove_field => ["message"] } # 第三步:解析时间戳 date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } # 第四步:补充地理位置 geoip { source => "clientip" } # 第五步:解析 User-Agent useragent { source => "agent" target => "ua" } }

过滤器的执行顺序就是配置中的书写顺序。一般遵循"先解析、再转换、后丰富"的原则:Grok 在最前面把原始文本拆开,Mutate 紧跟其后做类型转换和字段清理,Date/GeoIP/Useragent 等根据已有字段做信息补充。

日常排错建议

  • Grok 匹配失败:检查 Kibana Grok Debugger,确认模式与日志格式一致。生产环境务必处理 _grokparsefailure 标签事件。
  • Mutate 顺序问题:记住 Mutate 内部操作有固定执行顺序,不受配置书写位置影响。遇到 rename 和 convert 冲突时,拆成两个 mutate 块。
  • Date 时区偏移@timestamp 默认是 UTC 时间,查询时注意时区换算。如果业务强依赖本地时间,在 date 过滤器中指定 timezone。
  • GeoIP 数据库过旧:Logstash 内置的 GeoLite2 不会自动更新,地理信息不准确时需要手动下载最新数据库。
  • 性能瓶颈:Grok 是 CPU 密集型操作,复杂模式会导致吞吐量下降。可以考虑用 dissect 过滤器替代简单格式的 Grok 匹配,dissect 基于分隔符定位,性能更好。
标签:Logstash