5月27日 18:26

Logstash 有哪些常用的插件,如何安装和管理插件?

Logstash 的强大之处在于它的插件体系——输入、过滤、输出三大类插件覆盖了从数据采集到写入目标的全链路。面试中经常问到"Logstash 有哪些常用插件""怎么安装和管理插件",下面结合实际使用场景梳理清楚。

Input 插件:数据从哪里来?

Input 插件决定 Logstash 从哪个数据源读取数据。选对 Input 插件是搭建 Pipeline 的第一步。

file —— 读日志文件

file 是最基础的 Input 插件,行为类似 tail -f,持续读取文件新增内容:

conf
input { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" } }

几个关键参数:

  • start_position:首次读取时从文件开头(beginning)还是末尾(end)开始,默认 end
  • sincedb_path:记录已读取位置的文件路径,重启后从断点续读;设为 /dev/null 则每次从头读
  • mode:默认 tail 模式持续追踪,设为 read 则读完即退出

beats —— 接收 Beats 数据

Beats 是 Elastic 官方的轻量采集器家族(Filebeat、Metricbeat 等),beats 插件是 Logstash 与 Beats 配合的标准方式:

conf
input { beats { port => 5044 } }

生产环境中,Beats 负责在各服务器上采集数据,再统一发送到 Logstash 做集中处理,这是 ELK 架构中最常见的组合。

kafka —— 从 Kafka 消费消息

当数据量大、需要缓冲或多个消费者协同工作时,Kafka 是首选的中间层:

conf
input { kafka { bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092" topics => ["app-logs", "business-events"] group_id => "logstash-consumer" consumer_threads => 4 } }

其他常用 Input 插件

插件典型场景
jdbc定时从关系型数据库拉取增量数据
http对外暴露 HTTP 接口,接收外部系统推送的数据
tcp / udp接收网络协议数据,常用于收集 syslog
syslog专门解析 syslog 格式日志
redis从 Redis List 或 Channel 读取数据
elasticsearch从 ES 中查询数据做二次处理
s3从 AWS S3 桶读取归档日志

Filter 插件:数据怎么加工?

Filter 插件负责把非结构化的原始数据转换成结构化、可搜索的字段。这是 Logstash 最核心的能力。

grok —— 解析非结构化日志

grok 是使用频率最高的 Filter 插件,通过正则表达式模式把文本拆解成字段:

conf
filter { grok { match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATH:request_uri} %{NUMBER:response_code:int} %{NUMBER:bytes:int}" } tag_on_failure => ["_grokparsefailure"] } }

关键点:

  • Logstash 内置了大量命名模式(如 IPHOSTNAMECOMBINEDAPACHELOG),优先使用内置模式
  • tag_on_failure:匹配失败时打上标签,方便后续排查未解析的日志
  • 性能瓶颈:grok 基于正则,匹配复杂模式时 CPU 开销大,大规模数据场景下可考虑用 dissect 替代

mutate —— 字段操作

mutate 用于对字段进行增删改查,是日常配置中使用最频繁的 Filter 之一:

conf
filter { mutate { rename => { "old_field" => "new_field" } remove_field => ["unused_field"] convert => { "response_code" => "integer" "latency" => "float" } gsub => [ "message", "\", "/" ] } }

date —— 时间戳解析

日志中的时间格式五花八门,date 插件负责将字符串时间解析为 Logstash 事件的时间戳:

conf
filter { date { match => ["log_time", "yyyy-MM-dd HH:mm:ss", "ISO8601"] target => "@timestamp" timezone => "Asia/Shanghai" } }

注意:如果不确定时间格式,可以传多个模式数组,date 插件会依次尝试匹配。

其他常用 Filter 插件

插件作用
json解析 JSON 字符串为字段
csv按分隔符拆分 CSV 格式数据
geoip根据 IP 查询地理位置
useragent解析浏览器 User-Agent
ruby用 Ruby 代码实现复杂逻辑(性能敏感场景慎用)
aggregate跨事件聚合,如关联同一个请求的多条日志
dissect类似 grok 但基于固定分隔符,性能更好
drop直接丢弃不需要的事件
fingerprint给事件生成唯一标识

Output 插件:数据送到哪里去?

elasticsearch —— 写入 Elasticsearch

这是最常用的 Output 插件,生产环境中几乎必用:

conf
output { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "app-logs-%{+YYYY.MM.dd}" template => "/etc/logstash/templates/es-template.json" action => "create" retry_on_conflict => 3 } }

关键参数:

  • index:支持按时间动态生成索引名,%{+YYYY.MM.dd} 按天分索引
  • actionindex(默认,覆盖写入)或 create(仅当文档不存在时写入,防止重复)
  • retry_on_conflict:版本冲突时重试次数

kafka —— 写入 Kafka

conf
output { kafka { bootstrap_servers => "kafka-broker1:9092" topic_id => "processed-logs" codec => json compression_type => "snappy" } }

其他常用 Output 插件

插件场景
file写入本地文件
http推送到外部 HTTP API
redis写入 Redis
stdout控制台输出,调试时常用
email触发告警邮件
s3归档到 AWS S3
mongodb写入 MongoDB

Codec 插件:数据的编解码

Codec 插件常被忽略,但它影响着数据的序列化方式。常用的是 json 和 multiline:

conf
input { file { path => "/var/log/app.log" codec => multiline { pattern => "^%{TIMESTAMP_ISO8601}" negate => true what => "previous" } } }

multiline 用于把 Java 堆栈信息等多行日志合并为一条事件,pattern 匹配新日志行的开头,what => "previous" 表示不匹配的行归入上一条事件。

插件安装和管理

Logstash 通过 bin/logstash-plugin 命令管理插件生命周期:

查看已安装插件

bash
# 列出所有插件 bin/logstash-plugin list # 带版本号 bin/logstash-plugin list --verbose # 按分组查看 bin/logstash-plugin list --group input # 模糊搜索 bin/logstash-plugin list '*kafka*'

安装插件

bash
# 从 RubyGems 安装 bin/logstash-plugin install logstash-output-s3 # 指定版本 bin/logstash-plugin install logstash-output-s3 --version 10.0.0 # 从本地 gem 文件安装 bin/logstash-plugin install /path/to/logstash-output-custom-1.0.0.gem

更新和卸载

bash
# 更新全部插件 bin/logstash-plugin update # 更新指定插件 bin/logstash-plugin update logstash-output-s3 # 卸载插件(Logstash 7.x+ 中部分插件为集成插件,不可卸载) bin/logstash-plugin uninstall logstash-output-s3

离线安装

生产环境通常无法访问外网,需要制作离线安装包:

bash
# 在有网络的机器上生成离线包 bin/logstash-plugin prepare-offline-pack logstash-output-s3 # 在目标机器上安装 bin/logstash-plugin install file:///path/to/logstash-offline-plugins.zip

插件配置的实战经验

条件判断让 Pipeline 更高效

不同类型的日志走不同的 Filter 逻辑,避免无关插件浪费算力:

conf
filter { if [type] == "nginx-access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } geoip { source => "clientip" } } else if [type] == "app-error" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}" } } if [level] == "ERROR" { mutate { add_field => { "alert" => "true" } } } } }

grok 解析失败的兜底处理

grok 匹配失败是线上常见问题,必须处理:

conf
filter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { mutate { add_field => { "parse_error" => "true" } } } }

性能优化要点

  • 优先用 dissect 替代 grok:当日志格式固定时,dissect 基于分隔符拆分,性能比 grok 高出一个量级
  • 减少 mutate 调用次数:合并多个 mutate 操作到一个块内,减少事件反复处理
  • 慎用 ruby 插件:Ruby 代码执行效率远低于内置插件,只在无法用其他插件实现时才用
  • 合理配置 pipeline.workers 和 pipeline.batch.size:workers 数量通常设为 CPU 核心数,batch.size 根据事件大小调整(默认 125,大事件可适当调小)
  • 关注慢 Filter:在 logstash.yml 中开启 config.debug 和慢日志,定位瓶颈插件

自定义插件开发

当内置插件无法满足需求时(比如对接公司内部系统),就需要开发自定义插件。

创建插件骨架

bash
gem install logstash-plugin-generator logstash-plugin generate --type input --name myinput

生成的目录结构:

shell
logstash-input-myinput/ ├── lib/ │ └── logstash/ │ └── inputs/ │ └── myinput.rb ├── spec/ │ └── inputs/ │ └── myinput_spec.rb ├── Gemfile ├── logstash-input-myinput.gemspec └── README.md

核心实现

一个 Input 插件至少实现 registerrun 两个方法:

ruby
class LogStash::Inputs::Myinput < LogStash::Inputs::Base config_name "myinput" config :host, :validate => :string, :default => "0.0.0.0" config :port, :validate => :number, :required => true def register # 初始化资源,只在启动时调用一次 @server = TCPServer.new(@host, @port) end def run(queue) # 持续运行,产生事件后推入 queue loop do client = @server.accept while line = client.gets event = LogStash::Event.new("message" => line.chomp) decorate(event) queue << event end client.close end end def stop # 优雅关闭 @server.close if @server end end

构建与安装

bash
gem build logstash-input-myinput.gemspec bin/logstash-plugin install logstash-input-myinput-1.0.0.gem

编写测试

ruby
require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/myinput" describe LogStash::Inputs::Myinput do let(:config) { { "port" => 9999 } } it "registers without error" do input = described_class.new(config) expect { input.register }.not_to raise_error end end

插件版本管理的注意事项

  • 集成插件不可卸载:Logstash 7.x 之后,Kafka、Beats 等常用插件被合并为集成插件(logstash-integration-kafka),无法通过 uninstall 移除
  • 锁定版本:在 Gemfile 中指定版本避免升级引入兼容问题:gem "logstash-output-s3", "~> 10.0"
  • 升级策略bin/logstash-plugin update 默认只升级小版本和补丁版本,不会跨大版本升级,降低破坏性变更风险
  • 查看插件版本bin/logstash-plugin list --verbose | grep 插件名

掌握 Logstash 插件体系的关键在于理解 Input-Filter-Output 的数据流模型,以及每个插件在链路中的定位。日常使用中,grok 和 mutate 是最需要熟练掌握的 Filter 插件,elasticsearch output 是最核心的输出插件,而插件管理命令则保证你能灵活扩展和维护 Pipeline。

标签:Logstash