Logstash 有哪些常用的插件,如何安装和管理插件?
Logstash 的强大之处在于它的插件体系——输入、过滤、输出三大类插件覆盖了从数据采集到写入目标的全链路。面试中经常问到"Logstash 有哪些常用插件""怎么安装和管理插件",下面结合实际使用场景梳理清楚。
Input 插件:数据从哪里来?
Input 插件决定 Logstash 从哪个数据源读取数据。选对 Input 插件是搭建 Pipeline 的第一步。
file —— 读日志文件
file 是最基础的 Input 插件,行为类似 tail -f,持续读取文件新增内容:
confinput { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" } }
几个关键参数:
start_position:首次读取时从文件开头(beginning)还是末尾(end)开始,默认endsincedb_path:记录已读取位置的文件路径,重启后从断点续读;设为/dev/null则每次从头读mode:默认tail模式持续追踪,设为read则读完即退出
beats —— 接收 Beats 数据
Beats 是 Elastic 官方的轻量采集器家族(Filebeat、Metricbeat 等),beats 插件是 Logstash 与 Beats 配合的标准方式:
confinput { beats { port => 5044 } }
生产环境中,Beats 负责在各服务器上采集数据,再统一发送到 Logstash 做集中处理,这是 ELK 架构中最常见的组合。
kafka —— 从 Kafka 消费消息
当数据量大、需要缓冲或多个消费者协同工作时,Kafka 是首选的中间层:
confinput { kafka { bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092" topics => ["app-logs", "business-events"] group_id => "logstash-consumer" consumer_threads => 4 } }
其他常用 Input 插件
| 插件 | 典型场景 |
|---|---|
| jdbc | 定时从关系型数据库拉取增量数据 |
| http | 对外暴露 HTTP 接口,接收外部系统推送的数据 |
| tcp / udp | 接收网络协议数据,常用于收集 syslog |
| syslog | 专门解析 syslog 格式日志 |
| redis | 从 Redis List 或 Channel 读取数据 |
| elasticsearch | 从 ES 中查询数据做二次处理 |
| s3 | 从 AWS S3 桶读取归档日志 |
Filter 插件:数据怎么加工?
Filter 插件负责把非结构化的原始数据转换成结构化、可搜索的字段。这是 Logstash 最核心的能力。
grok —— 解析非结构化日志
grok 是使用频率最高的 Filter 插件,通过正则表达式模式把文本拆解成字段:
conffilter { grok { match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATH:request_uri} %{NUMBER:response_code:int} %{NUMBER:bytes:int}" } tag_on_failure => ["_grokparsefailure"] } }
关键点:
- Logstash 内置了大量命名模式(如
IP、HOSTNAME、COMBINEDAPACHELOG),优先使用内置模式 tag_on_failure:匹配失败时打上标签,方便后续排查未解析的日志- 性能瓶颈:grok 基于正则,匹配复杂模式时 CPU 开销大,大规模数据场景下可考虑用 dissect 替代
mutate —— 字段操作
mutate 用于对字段进行增删改查,是日常配置中使用最频繁的 Filter 之一:
conffilter { mutate { rename => { "old_field" => "new_field" } remove_field => ["unused_field"] convert => { "response_code" => "integer" "latency" => "float" } gsub => [ "message", "\", "/" ] } }
date —— 时间戳解析
日志中的时间格式五花八门,date 插件负责将字符串时间解析为 Logstash 事件的时间戳:
conffilter { date { match => ["log_time", "yyyy-MM-dd HH:mm:ss", "ISO8601"] target => "@timestamp" timezone => "Asia/Shanghai" } }
注意:如果不确定时间格式,可以传多个模式数组,date 插件会依次尝试匹配。
其他常用 Filter 插件
| 插件 | 作用 |
|---|---|
| json | 解析 JSON 字符串为字段 |
| csv | 按分隔符拆分 CSV 格式数据 |
| geoip | 根据 IP 查询地理位置 |
| useragent | 解析浏览器 User-Agent |
| ruby | 用 Ruby 代码实现复杂逻辑(性能敏感场景慎用) |
| aggregate | 跨事件聚合,如关联同一个请求的多条日志 |
| dissect | 类似 grok 但基于固定分隔符,性能更好 |
| drop | 直接丢弃不需要的事件 |
| fingerprint | 给事件生成唯一标识 |
Output 插件:数据送到哪里去?
elasticsearch —— 写入 Elasticsearch
这是最常用的 Output 插件,生产环境中几乎必用:
confoutput { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "app-logs-%{+YYYY.MM.dd}" template => "/etc/logstash/templates/es-template.json" action => "create" retry_on_conflict => 3 } }
关键参数:
index:支持按时间动态生成索引名,%{+YYYY.MM.dd}按天分索引action:index(默认,覆盖写入)或create(仅当文档不存在时写入,防止重复)retry_on_conflict:版本冲突时重试次数
kafka —— 写入 Kafka
confoutput { kafka { bootstrap_servers => "kafka-broker1:9092" topic_id => "processed-logs" codec => json compression_type => "snappy" } }
其他常用 Output 插件
| 插件 | 场景 |
|---|---|
| file | 写入本地文件 |
| http | 推送到外部 HTTP API |
| redis | 写入 Redis |
| stdout | 控制台输出,调试时常用 |
| 触发告警邮件 | |
| s3 | 归档到 AWS S3 |
| mongodb | 写入 MongoDB |
Codec 插件:数据的编解码
Codec 插件常被忽略,但它影响着数据的序列化方式。常用的是 json 和 multiline:
confinput { file { path => "/var/log/app.log" codec => multiline { pattern => "^%{TIMESTAMP_ISO8601}" negate => true what => "previous" } } }
multiline 用于把 Java 堆栈信息等多行日志合并为一条事件,pattern 匹配新日志行的开头,what => "previous" 表示不匹配的行归入上一条事件。
插件安装和管理
Logstash 通过 bin/logstash-plugin 命令管理插件生命周期:
查看已安装插件
bash# 列出所有插件 bin/logstash-plugin list # 带版本号 bin/logstash-plugin list --verbose # 按分组查看 bin/logstash-plugin list --group input # 模糊搜索 bin/logstash-plugin list '*kafka*'
安装插件
bash# 从 RubyGems 安装 bin/logstash-plugin install logstash-output-s3 # 指定版本 bin/logstash-plugin install logstash-output-s3 --version 10.0.0 # 从本地 gem 文件安装 bin/logstash-plugin install /path/to/logstash-output-custom-1.0.0.gem
更新和卸载
bash# 更新全部插件 bin/logstash-plugin update # 更新指定插件 bin/logstash-plugin update logstash-output-s3 # 卸载插件(Logstash 7.x+ 中部分插件为集成插件,不可卸载) bin/logstash-plugin uninstall logstash-output-s3
离线安装
生产环境通常无法访问外网,需要制作离线安装包:
bash# 在有网络的机器上生成离线包 bin/logstash-plugin prepare-offline-pack logstash-output-s3 # 在目标机器上安装 bin/logstash-plugin install file:///path/to/logstash-offline-plugins.zip
插件配置的实战经验
条件判断让 Pipeline 更高效
不同类型的日志走不同的 Filter 逻辑,避免无关插件浪费算力:
conffilter { if [type] == "nginx-access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } geoip { source => "clientip" } } else if [type] == "app-error" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}" } } if [level] == "ERROR" { mutate { add_field => { "alert" => "true" } } } } }
grok 解析失败的兜底处理
grok 匹配失败是线上常见问题,必须处理:
conffilter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { mutate { add_field => { "parse_error" => "true" } } } }
性能优化要点
- 优先用 dissect 替代 grok:当日志格式固定时,dissect 基于分隔符拆分,性能比 grok 高出一个量级
- 减少 mutate 调用次数:合并多个 mutate 操作到一个块内,减少事件反复处理
- 慎用 ruby 插件:Ruby 代码执行效率远低于内置插件,只在无法用其他插件实现时才用
- 合理配置 pipeline.workers 和 pipeline.batch.size:workers 数量通常设为 CPU 核心数,batch.size 根据事件大小调整(默认 125,大事件可适当调小)
- 关注慢 Filter:在
logstash.yml中开启config.debug和慢日志,定位瓶颈插件
自定义插件开发
当内置插件无法满足需求时(比如对接公司内部系统),就需要开发自定义插件。
创建插件骨架
bashgem install logstash-plugin-generator logstash-plugin generate --type input --name myinput
生成的目录结构:
shelllogstash-input-myinput/ ├── lib/ │ └── logstash/ │ └── inputs/ │ └── myinput.rb ├── spec/ │ └── inputs/ │ └── myinput_spec.rb ├── Gemfile ├── logstash-input-myinput.gemspec └── README.md
核心实现
一个 Input 插件至少实现 register 和 run 两个方法:
rubyclass LogStash::Inputs::Myinput < LogStash::Inputs::Base config_name "myinput" config :host, :validate => :string, :default => "0.0.0.0" config :port, :validate => :number, :required => true def register # 初始化资源,只在启动时调用一次 @server = TCPServer.new(@host, @port) end def run(queue) # 持续运行,产生事件后推入 queue loop do client = @server.accept while line = client.gets event = LogStash::Event.new("message" => line.chomp) decorate(event) queue << event end client.close end end def stop # 优雅关闭 @server.close if @server end end
构建与安装
bashgem build logstash-input-myinput.gemspec bin/logstash-plugin install logstash-input-myinput-1.0.0.gem
编写测试
rubyrequire "logstash/devutils/rspec/spec_helper" require "logstash/inputs/myinput" describe LogStash::Inputs::Myinput do let(:config) { { "port" => 9999 } } it "registers without error" do input = described_class.new(config) expect { input.register }.not_to raise_error end end
插件版本管理的注意事项
- 集成插件不可卸载:Logstash 7.x 之后,Kafka、Beats 等常用插件被合并为集成插件(
logstash-integration-kafka),无法通过 uninstall 移除 - 锁定版本:在
Gemfile中指定版本避免升级引入兼容问题:gem "logstash-output-s3", "~> 10.0" - 升级策略:
bin/logstash-plugin update默认只升级小版本和补丁版本,不会跨大版本升级,降低破坏性变更风险 - 查看插件版本:
bin/logstash-plugin list --verbose | grep 插件名
掌握 Logstash 插件体系的关键在于理解 Input-Filter-Output 的数据流模型,以及每个插件在链路中的定位。日常使用中,grok 和 mutate 是最需要熟练掌握的 Filter 插件,elasticsearch output 是最核心的输出插件,而插件管理命令则保证你能灵活扩展和维护 Pipeline。