服务端5月30日 12:39
什么是 Logstash?它的工作原理是什么?Logstash 是 Elastic Stack 里的服务端数据处理管道,用来采集、解析、转换并转发日志或事件。它的核心模型很简单:Input 负责进数据,Filter 负责处理数据,Output 负责送到 Elasticsearch、Kafka、文件等目标。真正的价值不只是“搬运日志”,而是把杂乱文本变成可检索、可分析的结构化字段。
## 追问
### Logstash 的三段式流程是什么?
Input 从 File、Beats、Kafka、HTTP 等来源读取事件;Filter 用 grok、mutate、date、geoip 等插件加工字段;Output 把结果写到 ES、Kafka、Redis 或文件。
### Logstash 和 Filebeat 有什么区别?
Filebeat 更轻,主要负责边缘采集和转发;Logstash 更重,适合复杂解析、字段清洗和多目标路由。
### 它的主要瓶颈在哪里?
常见瓶颈在 Grok 正则、输出端 bulk 写入和队列堆积。标签
Logstash
Logstash是一个开源的数据收集引擎,主要用于处理和转换各种日志和事件数据。它支持从多种来源(如文件、网络、消息队列和数据库等)收集数据,并将其转换为统一的格式,以便存储、分析和可视化。Logstash提供了大量的插件和过滤器,可以用于数据转换、数据清洗、数据标准化和数据增强等方面。它还支持多种输出,如Elasticsearch、Redis、Kafka和Splunk等。Logstash可以与Elasticsearch和Kibana等开源工具集成,形成一个完整的ELK(Elasticsearch、Logstash和Kibana)堆栈,用于搜索、分析和可视化数据。Logstash适用于多种场景,如安全监控、日志管理、应用性能监控和业务分析等。

服务端5月30日 12:39
Logstash Grok 过滤器是什么?如何解析日志?Grok 是 Logstash 里把非结构化日志拆成字段的过滤器,本质是“命名正则模板”。它用 `%{PATTERN:field}` 把文本匹配成字段,比如把 Nginx 日志拆出 IP、状态码、请求路径。面试时要说清:Grok 适合解析固定格式文本;解析失败会打 `_grokparsefailure`;性能上要避免一上来就用 `%{GREEDYDATA}` 贪婪匹配。
## 追问
### Grok 和普通正则有什么区别?
Grok 是对正则的封装,内置了很多模式,如 `IP`、`NUMBER`、`TIMESTAMP_ISO8601`。
### 解析失败怎么排查?
先用 Grok Debugger 验证模式,再看事件里是否出现 `_grokparsefailure`。
## 写段配置
```conf
filter {
grok { match => { "message" => "%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:level} %{GREEDYDATA:msg}" } }
}
```服务端5月30日 12:39
Logstash 常用输入插件有哪些?File 和 Kafka 怎么配置?Logstash 输入插件负责从数据源读取事件,常见的有 File、Beats、Kafka、HTTP、TCP/UDP、Syslog、JDBC、Redis。File 适合读取本机或挂载目录里的日志,Kafka 适合高吞吐、可回放的日志总线。生产里通常让 Filebeat 采集文件,再发给 Logstash;只有需要复杂本地读取规则时,才直接用 File input。
## 追问
### File input 的 start_position 有什么坑?
它只在文件第一次被 Logstash 发现时生效。读到哪里由 sincedb 记录,改成 `beginning` 不会自动重读旧文件。
### Kafka input 为什么要配 group_id?
同组多个 Logstash 实例会分摊分区,不同组会各自消费一份。
## 写段配置
```conf
input {
file { path => ["/var/log/app/*.log"] start_position => "end" }
kafka { bootstrap_servers => "kafka:9092" topics => ["app-logs"] group_id => "logstash-app" }
}
```服务端5月30日 12:39
Logstash 常用输出插件有哪些?Elasticsearch 输出怎么配置?Logstash 输出插件负责把处理后的事件送到目标系统。常见输出有 Elasticsearch、File、Kafka、Redis、HTTP、Stdout;生产里最常用的是 Elasticsearch,通常要配 hosts、index、认证、TLS、失败重试和条件路由。索引名建议按业务和日期拆分,比如 `app-%{[service]}-%{+YYYY.MM.dd}`,不要所有日志都塞进一个大索引。
## 追问
### Elasticsearch 输出必须配哪些参数?
至少配 `hosts` 和 `index`。如果集群开启安全认证,还要配 `user/password`、`ssl`、`cacert`。
### Kafka、File、Stdout 分别适合什么场景?
Kafka 适合解耦下游,File 适合落盘备份,Stdout 主要用于调试。
### 实际项目里最容易踩什么坑?
动态索引字段为空会生成脏索引,ES 认证或证书错误会导致持续重试。
## 写段配置
```conf
output {
elasticsearch { hosts => ["https://es1:9200"] index => "app-%{+YYYY.MM.dd}" }
}
```服务端5月30日 12:39
Logstash 条件判断怎么写?常见操作符有哪些?Logstash 条件判断写在 `filter` 和 `output` 里最常见,用来按字段、标签、正则或数值把事件分流。语法类似 `if/else if/else`:比较用 `== != < > <= >=`,逻辑用 `and or not`,正则用 `=~ !~`,包含判断用 `in`、`not in`。注意条件一般不能放在 input 插件内部,因为 input 阶段事件字段还没生成。
## 追问
### `in` 判断字符串和数组有什么坑?
`"error" in [tags]` 是判断标签数组是否包含 error;`"err" in [message]` 可能变成字符串包含判断。
### 正则匹配适合放很多吗?
不适合滥用。大量复杂正则会拖慢 pipeline,高频字段能用精确匹配就别用正则。
### 解析失败怎么处理?
利用 `_grokparsefailure`,再用条件把失败事件打标、落单独索引或输出到排查队列。
## 写段配置
```conf
filter {
if [type] == "nginx" and [status] >= 500 {
mutate { add_tag => ["server_error"] }
}
}
```服务端5月30日 12:39
Logstash 集群如何部署?高可用方案怎么选?Logstash 本身没有主从式集群,所谓 Logstash 集群通常是多实例横向部署:上游用 Beats 负载均衡、Kafka/Redis 缓冲,或 LB 分发流量;下游写 Elasticsearch。高可用重点不是“选主”,而是让输入可重放、实例可替换、配置一致、队列不丢数据。生产里优先推荐 Filebeat loadbalance + 多 Logstash;流量大或不能丢日志时,在前面加 Kafka,再让多个 Logstash consumer group 消费。
## 追问
### Beats 直连和 Kafka 缓冲怎么选?
Beats 直连简单、延迟低,适合可接受短暂重试的日志链路。Kafka 多一层运维成本,但能削峰、回放、隔离 Logstash 故障。
### 持久化队列能替代 Kafka 吗?
不能完全替代。persisted queue 只保护单个 Logstash 节点本地未处理事件,节点磁盘坏了仍可能丢。
### 多节点配置怎么管理?
配置放 Git,用 Ansible、K8s ConfigMap 或镜像发布同步。上线前跑 `--config.test_and_exit`。
## 写段配置
```yaml
queue.type: persisted
queue.max_bytes: 1gb
pipeline.workers: 4
pipeline.batch.size: 500
```服务端5月27日 20:20
Logstash 在 ELK Stack 中扮演什么角色,与 Elasticsearch 和 Kibana 如何协作?## 答案
Logstash 是 ELK Stack 的数据采集与处理管道,负责从多种数据源收集日志,经解析、过滤、转换后输出到 Elasticsearch;Elasticsearch 承担索引存储与全文检索;Kibana 提供可视化与交互界面。三者协作:数据源 → Logstash(采集+处理)→ Elasticsearch(存储+检索)→ Kibana(可视化)。
## Logstash 的核心职责
Logstash 基于 input-filter-output 三段式管道:
- **Input**:从文件、syslog、Kafka、Beats 等数据源采集原始日志
- **Filter**:用 Grok 解析非结构化日志为结构化字段,用 Mutate 修改字段,用 Date 标准化时间,用 GeoIP 丰富地理信息
- **Output**:将处理后的数据写入 Elasticsearch,也可同时输出到 Kafka、文件等
配置示例:
```
input { beats { port => 5044 } }
filter {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
mutate { remove_field => ["tags"] }
}
output { elasticsearch { hosts => ["localhost:9200"] index => "app-logs" } }
```
## 三者协作的关键机制
Logstash 与 Elasticsearch 通过 REST API 批量写入,默认按批次刷写提升吞吐。引入 Kafka 或 Redis 作为缓冲层可解耦采集与写入,应对流量突增。Elasticsearch 为 Kibana 提供查询与聚合 API,Kibana 基于此构建仪表板和告警。
当 Logstash 处理能力不足时,可用 Beats 替代其采集角色,Logstash 专注过滤转换,形成 Beats → Logstash → Elasticsearch 的分层架构。Logstash 还支持持久化队列(PQ)防止数据丢失,死信队列(DLQ)捕获处理失败的事件。
## 追问
**Q: Logstash 与 Fluentd 怎么选?**
Logstash 插件生态丰富(200+),适合复杂 ETL;Fluentd 基于 CRuby 更轻量,Kubernetes 环境下常用 Fluentd 替代 Logstash。
**Q: 如何排查 Logstash 管道性能瓶颈?**
用 `--config.test_and_exit` 验证配置,`--log.level debug` 观察事件流,调整 `pipeline.workers`(建议等于 CPU 核数)和 `pipeline.batch.size`,监控队列积压。
**Q: Logstash 如何保证数据不丢失?**
开启持久化队列(`queue.type: persisted`),事件先写磁盘再处理;配合死信队列捕获解析失败的事件,避免静默丢弃。服务端5月27日 18:26
Logstash 有哪些常用的插件,如何安装和管理插件?Logstash 的强大之处在于它的插件体系——输入、过滤、输出三大类插件覆盖了从数据采集到写入目标的全链路。面试中经常问到"Logstash 有哪些常用插件""怎么安装和管理插件",下面结合实际使用场景梳理清楚。
## Input 插件:数据从哪里来?
Input 插件决定 Logstash 从哪个数据源读取数据。选对 Input 插件是搭建 Pipeline 的第一步。
### file —— 读日志文件
file 是最基础的 Input 插件,行为类似 `tail -f`,持续读取文件新增内容:
```conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
}
}
```
几个关键参数:
- `start_position`:首次读取时从文件开头(`beginning`)还是末尾(`end`)开始,默认 `end`
- `sincedb_path`:记录已读取位置的文件路径,重启后从断点续读;设为 `/dev/null` 则每次从头读
- `mode`:默认 `tail` 模式持续追踪,设为 `read` 则读完即退出
### beats —— 接收 Beats 数据
Beats 是 Elastic 官方的轻量采集器家族(Filebeat、Metricbeat 等),beats 插件是 Logstash 与 Beats 配合的标准方式:
```conf
input {
beats {
port => 5044
}
}
```
生产环境中,Beats 负责在各服务器上采集数据,再统一发送到 Logstash 做集中处理,这是 ELK 架构中最常见的组合。
### kafka —— 从 Kafka 消费消息
当数据量大、需要缓冲或多个消费者协同工作时,Kafka 是首选的中间层:
```conf
input {
kafka {
bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"
topics => ["app-logs", "business-events"]
group_id => "logstash-consumer"
consumer_threads => 4
}
}
```
### 其他常用 Input 插件
| 插件 | 典型场景 |
|------|---------|
| jdbc | 定时从关系型数据库拉取增量数据 |
| http | 对外暴露 HTTP 接口,接收外部系统推送的数据 |
| tcp / udp | 接收网络协议数据,常用于收集 syslog |
| syslog | 专门解析 syslog 格式日志 |
| redis | 从 Redis List 或 Channel 读取数据 |
| elasticsearch | 从 ES 中查询数据做二次处理 |
| s3 | 从 AWS S3 桶读取归档日志 |
## Filter 插件:数据怎么加工?
Filter 插件负责把非结构化的原始数据转换成结构化、可搜索的字段。这是 Logstash 最核心的能力。
### grok —— 解析非结构化日志
grok 是使用频率最高的 Filter 插件,通过正则表达式模式把文本拆解成字段:
```conf
filter {
grok {
match => {
"message" => "%{IP:client_ip} %{WORD:method} %{URIPATH:request_uri} %{NUMBER:response_code:int} %{NUMBER:bytes:int}"
}
tag_on_failure => ["_grokparsefailure"]
}
}
```
关键点:
- Logstash 内置了大量命名模式(如 `IP`、`HOSTNAME`、`COMBINEDAPACHELOG`),优先使用内置模式
- `tag_on_failure`:匹配失败时打上标签,方便后续排查未解析的日志
- 性能瓶颈:grok 基于正则,匹配复杂模式时 CPU 开销大,大规模数据场景下可考虑用 dissect 替代
### mutate —— 字段操作
mutate 用于对字段进行增删改查,是日常配置中使用最频繁的 Filter 之一:
```conf
filter {
mutate {
rename => { "old_field" => "new_field" }
remove_field => ["unused_field"]
convert => {
"response_code" => "integer"
"latency" => "float"
}
gsub => [
"message", "\", "/"
]
}
}
```
### date —— 时间戳解析
日志中的时间格式五花八门,date 插件负责将字符串时间解析为 Logstash 事件的时间戳:
```conf
filter {
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss", "ISO8601"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
```
注意:如果不确定时间格式,可以传多个模式数组,date 插件会依次尝试匹配。
### 其他常用 Filter 插件
| 插件 | 作用 |
|------|------|
| json | 解析 JSON 字符串为字段 |
| csv | 按分隔符拆分 CSV 格式数据 |
| geoip | 根据 IP 查询地理位置 |
| useragent | 解析浏览器 User-Agent |
| ruby | 用 Ruby 代码实现复杂逻辑(性能敏感场景慎用) |
| aggregate | 跨事件聚合,如关联同一个请求的多条日志 |
| dissect | 类似 grok 但基于固定分隔符,性能更好 |
| drop | 直接丢弃不需要的事件 |
| fingerprint | 给事件生成唯一标识 |
## Output 插件:数据送到哪里去?
### elasticsearch —— 写入 Elasticsearch
这是最常用的 Output 插件,生产环境中几乎必用:
```conf
output {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
template => "/etc/logstash/templates/es-template.json"
action => "create"
retry_on_conflict => 3
}
}
```
关键参数:
- `index`:支持按时间动态生成索引名,`%{+YYYY.MM.dd}` 按天分索引
- `action`:`index`(默认,覆盖写入)或 `create`(仅当文档不存在时写入,防止重复)
- `retry_on_conflict`:版本冲突时重试次数
### kafka —— 写入 Kafka
```conf
output {
kafka {
bootstrap_servers => "kafka-broker1:9092"
topic_id => "processed-logs"
codec => json
compression_type => "snappy"
}
}
```
### 其他常用 Output 插件
| 插件 | 场景 |
|------|------|
| file | 写入本地文件 |
| http | 推送到外部 HTTP API |
| redis | 写入 Redis |
| stdout | 控制台输出,调试时常用 |
| email | 触发告警邮件 |
| s3 | 归档到 AWS S3 |
| mongodb | 写入 MongoDB |
## Codec 插件:数据的编解码
Codec 插件常被忽略,但它影响着数据的序列化方式。常用的是 json 和 multiline:
```conf
input {
file {
path => "/var/log/app.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
```
multiline 用于把 Java 堆栈信息等多行日志合并为一条事件,`pattern` 匹配新日志行的开头,`what => "previous"` 表示不匹配的行归入上一条事件。
## 插件安装和管理
Logstash 通过 `bin/logstash-plugin` 命令管理插件生命周期:
### 查看已安装插件
```bash
# 列出所有插件
bin/logstash-plugin list
# 带版本号
bin/logstash-plugin list --verbose
# 按分组查看
bin/logstash-plugin list --group input
# 模糊搜索
bin/logstash-plugin list '*kafka*'
```
### 安装插件
```bash
# 从 RubyGems 安装
bin/logstash-plugin install logstash-output-s3
# 指定版本
bin/logstash-plugin install logstash-output-s3 --version 10.0.0
# 从本地 gem 文件安装
bin/logstash-plugin install /path/to/logstash-output-custom-1.0.0.gem
```
### 更新和卸载
```bash
# 更新全部插件
bin/logstash-plugin update
# 更新指定插件
bin/logstash-plugin update logstash-output-s3
# 卸载插件(Logstash 7.x+ 中部分插件为集成插件,不可卸载)
bin/logstash-plugin uninstall logstash-output-s3
```
### 离线安装
生产环境通常无法访问外网,需要制作离线安装包:
```bash
# 在有网络的机器上生成离线包
bin/logstash-plugin prepare-offline-pack logstash-output-s3
# 在目标机器上安装
bin/logstash-plugin install file:///path/to/logstash-offline-plugins.zip
```
## 插件配置的实战经验
### 条件判断让 Pipeline 更高效
不同类型的日志走不同的 Filter 逻辑,避免无关插件浪费算力:
```conf
filter {
if [type] == "nginx-access" {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
geoip { source => "clientip" }
} else if [type] == "app-error" {
grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}" } }
if [level] == "ERROR" {
mutate { add_field => { "alert" => "true" } }
}
}
}
```
### grok 解析失败的兜底处理
grok 匹配失败是线上常见问题,必须处理:
```conf
filter {
grok {
match => { "message" => "%{PATTERN:field}" }
tag_on_failure => ["_grokparsefailure"]
}
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "parse_error" => "true" }
}
}
}
```
### 性能优化要点
- **优先用 dissect 替代 grok**:当日志格式固定时,dissect 基于分隔符拆分,性能比 grok 高出一个量级
- **减少 mutate 调用次数**:合并多个 mutate 操作到一个块内,减少事件反复处理
- **慎用 ruby 插件**:Ruby 代码执行效率远低于内置插件,只在无法用其他插件实现时才用
- **合理配置 pipeline.workers 和 pipeline.batch.size**:workers 数量通常设为 CPU 核心数,batch.size 根据事件大小调整(默认 125,大事件可适当调小)
- **关注慢 Filter**:在 `logstash.yml` 中开启 `config.debug` 和慢日志,定位瓶颈插件
## 自定义插件开发
当内置插件无法满足需求时(比如对接公司内部系统),就需要开发自定义插件。
### 创建插件骨架
```bash
gem install logstash-plugin-generator
logstash-plugin generate --type input --name myinput
```
生成的目录结构:
```
logstash-input-myinput/
├── lib/
│ └── logstash/
│ └── inputs/
│ └── myinput.rb
├── spec/
│ └── inputs/
│ └── myinput_spec.rb
├── Gemfile
├── logstash-input-myinput.gemspec
└── README.md
```
### 核心实现
一个 Input 插件至少实现 `register` 和 `run` 两个方法:
```ruby
class LogStash::Inputs::Myinput < LogStash::Inputs::Base
config_name "myinput"
config :host, :validate => :string, :default => "0.0.0.0"
config :port, :validate => :number, :required => true
def register
# 初始化资源,只在启动时调用一次
@server = TCPServer.new(@host, @port)
end
def run(queue)
# 持续运行,产生事件后推入 queue
loop do
client = @server.accept
while line = client.gets
event = LogStash::Event.new("message" => line.chomp)
decorate(event)
queue << event
end
client.close
end
end
def stop
# 优雅关闭
@server.close if @server
end
end
```
### 构建与安装
```bash
gem build logstash-input-myinput.gemspec
bin/logstash-plugin install logstash-input-myinput-1.0.0.gem
```
### 编写测试
```ruby
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/myinput"
describe LogStash::Inputs::Myinput do
let(:config) { { "port" => 9999 } }
it "registers without error" do
input = described_class.new(config)
expect { input.register }.not_to raise_error
end
end
```
## 插件版本管理的注意事项
- **集成插件不可卸载**:Logstash 7.x 之后,Kafka、Beats 等常用插件被合并为集成插件(`logstash-integration-kafka`),无法通过 uninstall 移除
- **锁定版本**:在 `Gemfile` 中指定版本避免升级引入兼容问题:`gem "logstash-output-s3", "~> 10.0"`
- **升级策略**:`bin/logstash-plugin update` 默认只升级小版本和补丁版本,不会跨大版本升级,降低破坏性变更风险
- **查看插件版本**:`bin/logstash-plugin list --verbose | grep 插件名`
掌握 Logstash 插件体系的关键在于理解 Input-Filter-Output 的数据流模型,以及每个插件在链路中的定位。日常使用中,grok 和 mutate 是最需要熟练掌握的 Filter 插件,elasticsearch output 是最核心的输出插件,而插件管理命令则保证你能灵活扩展和维护 Pipeline。服务端5月27日 18:24
Logstash 性能怎么调?从瓶颈定位到参数优化的实战方案Logstash 吞吐量上不去,CPU 打满却处理不完日志,这类问题在生产环境里太常见了。很多团队第一反应是加机器,但多数情况下调对参数就能让现有资源发挥出两三倍的吞吐。
这篇文章从实际踩坑经验出发,讲清楚 Logstash 性能瓶颈怎么定位、各参数怎么调、调了之后有什么效果。读完你会知道:什么时候该调 pipeline.workers,什么时候该加 Kafka 缓冲,G1GC 到底有没有用,以及那些看起来合理但实际拖慢速度的配置。
## 先定位瓶颈再动手
调优最忌讳盲目改参数。动手之前,先用 Logstash 自带的监控 API 看清楚瓶颈在哪:
```bash
curl -s localhost:9600/_node/stats/pipelines | jq '.pipelines.main'
```
重点关注这几个指标:
| 指标 | 含义 | 健康范围 |
|------|------|----------|
| `events.in` | 每秒摄入事件数 | 接近输入源速率 |
| `events.out` | 每秒输出事件数 | 与 `events.in` 基本持平 |
| `events.filtered` | 过滤后事件数 | 合理的过滤率 |
| `pipeline.workers` 活跃数 | 当前工作线程 | 等于配置值 |
| `queue.type` | 队列类型 | memory 或 persisted |
如果 `events.in` 远大于 `events.out`,说明处理速度跟不上摄入速度,瓶颈在 filter 或 output。如果 CPU 使用率低但吞吐上不去,问题可能出在 I/O 等待或网络延迟上。
## JVM 调优:堆内存和 GC 怎么配
Logstash 跑在 JVM 上,内存配置直接影响性能。在 `config/jvm.options` 里调整:
```bash
-Xms4g
-Xmx4g
-XX:+UseG1GC
```
**堆内存设置原则**:
- Xms 和 Xmx 设成一样的值。动态扩缩容会触发 Full GC,导致处理暂停,日志管道会短暂卡顿
- 堆内存不要超过物理内存的 50%。Logstash 本身还需要堆外内存做缓冲区和网络 I/O,堆太大会让操作系统可用内存不足,反而触发 swap
- 大多数场景 4-8GB 就够了。超过 8GB 不一定更好——堆越大,GC 扫描的时间越长,G1GC 在 4-8GB 区间表现最好
**G1GC 是否值得开?** 实测下来,G1GC 相比默认的 Parallel GC,在堆内存 4GB 以上时 Full GC 暂停时间从秒级降到百毫秒级。但如果堆只有 2GB,G1GC 的分区管理开销反而可能让吞吐量下降 5%-10%。所以:4GB 以上开 G1GC,2GB 以下用默认的就行。
**一个容易踩的坑**:如果日志里有大量 Grok 解析失败,会产生异常对象堆积在堆里。这时候调大堆只是延缓问题,根本办法是修 Grok 模式或用 `if` 条件跳过不需要解析的日志。
## Pipeline 参数:workers、batch size、delay 怎么平衡
这三个参数互相影响,单独调一个往往看不到效果。
### pipeline.workers
```conf
# logstash.yml
pipeline.workers: 4
```
这是处理事件的线程数。默认值是 CPU 核心数,但有个前提:你的 filter 和 output 插件必须是线程安全的。大多数官方插件没问题,但自定义插件需要确认。
实际调法:先设成 CPU 核心数跑基准测试,然后分别试 `核心数/2` 和 `核心数*2`,看哪个 EPS 最高。经验上,filter 重(大量 Grok 正则)的场景设成核心数就行,filter 轻但 output 重(往 ES 写入)的场景可以适当加倍。
### pipeline.batch.size
```conf
pipeline.batch.size: 125
```
每个 worker 一次拿多少事件来处理。默认 125 是个保守值。增大 batch size 能减少事件调度开销,提高吞吐量:
- **高吞吐场景**(日志量 > 10万/分钟):调到 500-1000
- **低延迟场景**(实时告警):保持 125 或更小
batch size 不是越大越好。过大的 batch 会导致单个批次处理时间变长,增加事件从进入到输出端的端到端延迟。而且如果 filter 里有 Grok 失败的情况,大 batch 会让重试开销放大。
### pipeline.batch.delay
```conf
pipeline.batch.delay: 50
```
worker 等待多久凑够一个 batch 再开始处理,单位毫秒。默认 50ms。这个参数的意义是:当事件流入速度不够快时,等一等能凑满 batch,减少处理次数。
- 事件流入速度很快:delay 可以降到 10-20ms,减少等待
- 事件流入速度慢但实时性要求高:降到 5ms 甚至 1ms
- 事件流入速度慢且不要求实时:保持 50ms,省 CPU
**三者联动经验**:高吞吐场景用 `workers=核心数, batch.size=500, batch.delay=10`;低延迟场景用 `workers=4, batch.size=50, batch.delay=5`。改一个参数时保持其他两个不变,观察 EPS 变化,找到拐点。
## Filter 优化:减少无用功
Filter 是 Logstash 最容易成为瓶颈的环节,尤其是 Grok。
### Grok 是性能杀手
Grok 底层是正则表达式,每条日志都要跑一遍模式匹配。优化 Grok 的方法:
1. **用 `if` 条件跳过不需要 Grok 的日志**:
```conf
filter {
if [type] == "nginx_access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
}
```
这看起来简单,但实际效果可能比调参数还明显。一条不需要 Grok 的日志跳过正则匹配,省下的是毫秒级的 CPU 时间。
2. **自定义模式比组合内置模式快**:内置的 COMBINEDAPACHELOG 实际上是多个小模式拼接的,每次匹配都要逐个尝试。写成一条自定义模式能减少匹配次数:
```conf
# 自定义模式文件
NGINX_ACCESS %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes}
```
3. **把最常匹配的模式放前面**:Grok 是按顺序尝试匹配的,最可能的模式放第一个能最快命中。
4. **用 dissect 替代简单格式的 Grok**:如果日志格式是固定的分隔符(如管道符、逗号分隔),dissect 插件用分隔符切分,比正则匹配快 5-10 倍:
```conf
filter {
dissect {
mapping => { "message" => "%{ip} | %{user} | %{action}" }
}
}
```
### 其他 Filter 优化
- **mutate 的 remove_field**:尽早删掉不需要的字段,减少后续处理的数据量
- **用 drop 过滤器丢弃无用事件**:在 filter 链最前面丢弃,比处理完再丢省很多资源
- **避免重复解析**:如果上游已经做过 JSON 解析,不要再用 json filter 再解析一遍
## Output 优化:往 ES 写数据的讲究
ES output 是最常见的瓶颈之一。
### 批量写入参数
```conf
output {
elasticsearch {
hosts => ["http://es-cluster:9200"]
http_compression => true
}
}
```
注意:旧版本的 `flush_size` 和 `idle_flush_time` 参数已经在 7.x 之后废弃,改由 pipeline 的 batch size 和 batch delay 统一控制。如果你还在用这两个参数,升级后删掉,否则会有告警。
**http_compression => true** 这个一定要开。压缩后网络传输量减少 60%-80%,对跨机房写入场景效果尤其明显,CPU 开销可以忽略。
### 连接池调优
如果 ES 集群有多个节点,Logstash 会自动轮询写入。但默认连接池大小可能不够,高并发场景下可以在 ES output 里显式配置:
```conf
output {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200", "http://es-node3:9200"]
http_compression => true
# 新版本支持的批量操作参数
action => "index"
}
}
```
## 持久队列:防数据丢失的最后防线
```conf
# logstash.yml
queue.type: persisted
path.queue: /data/logstash/queue
queue.page_capacity: 250mb
queue.max_events: 0
queue.max_bytes: 4gb
```
持久队列把事件写到磁盘,Logstash 重启或崩溃时不丢数据。代价是吞吐量下降 10%-20%,因为每次事件要写磁盘。
什么场景该开持久队列:
- 数据不能丢(金融日志、审计日志)
- 下游 ES 不稳定,偶尔写入失败
- Logstash 重启频繁
什么场景可以不开:
- 日志允许少量丢失(纯分析用途的访问日志)
- 下游写入非常稳定
- 对吞吐量有极致要求
## 架构层面:加缓冲和水平扩展
单机调优总有上限。当一台 Logstash 处理不过来,架构上的调整比继续压单机更有效。
### 加 Kafka 缓冲
```
Filebeat → Kafka → Logstash → Elasticsearch
```
Kafka 在中间起两个作用:一是缓冲突发流量,Logstash 处理不过来时 Kafka 先存着;二是解耦,上游采集和下游处理互不影响。
Kafka 场景下的 Logstash 配置要点:
```conf
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["nginx-logs"]
group_id => "logstash-nginx"
consumer_threads => 4
auto_offset_reset => "earliest"
}
}
```
`consumer_threads` 建议设成 Kafka 分区数。如果分区数是 12,设 12 个 consumer 线程能充分利用并行消费。
### 多实例水平扩展
起多个 Logstash 实例,用负载均衡或 Kafka consumer group 分流:
- 如果输入源是 Kafka:每个 Logstash 实例配相同的 `group_id`,Kafka 自动分配分区给不同实例
- 如果输入源是 Beats:在 Beats 和 Logstash 之间加一层 Nginx 或 HAProxy 做 TCP 负载均衡
### 用 Beats 替代 Logstash 做采集
Filebeat、Metricbeat 比 Logstash 轻量得多,资源占用大约是 Logstash 的 1/10。架构上让 Beats 做采集、Logstash 做处理,比让 Logstash 又采集又处理高效得多。
## 怎么验证优化效果
每次只改一个参数,跑一轮基准测试对比。用 Logstash 自带的 generator 输入插件做压测:
```conf
input {
generator {
lines => ["192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /api/users HTTP/1.1\" 200 2326"]
count => 1000000
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
stdout { codec => dots }
}
```
跑完后看输出的时间,算出 EPS(每秒处理事件数)。把这个值作为基准,改一个参数再跑,对比变化。
生产环境监控关键指标:`events.in` 和 `events.out` 的差值(积压量)、JVM 堆使用率、GC 频率和耗时。如果堆使用率持续超过 75% 或 Full GC 频率超过每分钟一次,说明要么堆太小,要么 filter 有内存泄漏。
调优没有一劳永逸的方案。日志格式变了、流量模式变了、ES 集群扩容了,都可能让之前的调优配置不再最优。养成定期看监控、定期跑基准测试的习惯,比任何单次调优都重要。服务端5月27日 18:22
Logstash 有哪些常用过滤器?Grok 和 Mutate 怎么用?## Grok 过滤器:把非结构化日志变成结构化数据
Grok 是 Logstash 中使用频率最高的过滤器,核心能力是将一行纯文本日志拆解成有名字段的 JSON。它的底层原理是基于正则表达式的模式匹配,但 Elastic 已经预置了大量常用模式,日常使用不需要手写正则。
### 基本匹配
最典型的场景是解析 Apache/Nginx 访问日志。`COMBINEDAPACHELOG` 是内置模式,一条配置就能提取 clientip、response_code、bytes 等十几个字段:
```conf
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
```
匹配成功后,原来 message 字段中的一整行日志会被拆成 `clientip`、`ident`、`auth`、`timestamp`、`verb`、`request`、`httpversion`、`response`、`bytes` 等独立字段,后续过滤器和输出插件都能直接引用。
### 多模式备选
实际生产中,日志格式往往不止一种。Grok 支持传入一个模式数组,按顺序依次尝试匹配,命中的第一个生效:
```conf
filter {
grok {
match => {
"message" => [
"%{COMBINEDAPACHELOG}",
"%{COMMONAPACHELOG}",
"%{SYSLOGBASE} %{GREEDYDATA:message}"
]
}
}
}
```
这种方式比写一个超长的"万能正则"更易维护,哪条模式匹配了也更容易排查。
### 自定义模式
当内置模式无法满足需求时,可以在外部文件中定义自己的模式。模式文件的语法是 `PATTERN_NAME regex`,一行一个:
```conf
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "%{MYAPP_LOG:myapp_field}"
}
}
}
```
对应的 `/etc/logstash/patterns/myapp` 文件内容示例:
```
MYAPP_LOG \[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}
```
### Grok 匹配失败怎么办
Grok 匹配失败时,Logstash 会自动给事件打上 `_grokparsefailure` 标签。在生产环境中,应该用条件判断捕获这些失败事件,避免脏数据静默进入 Elasticsearch:
```conf
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "parse_error" => "grok failed for message" }
}
}
}
```
也可以借助 Kibana 自带的 Grok Debugger 工具调试模式,避免反复重启 Logstash。
## Mutate 过滤器:字段级别的加工工具
Mutate 是"万能修理工",几乎所有的字段增删改操作都能靠它完成。它不关心数据来源,只对已有字段做变换。
### 重命名字段
从 Beats 或其他输入源拿到的字段名不符合规范时,用 rename 统一命名:
```conf
filter {
mutate {
rename => { "client_ip" => "source_ip" }
}
}
```
### 类型转换
Grok 解析出来的字段默认都是字符串类型,想做数值聚合或范围查询,必须先转换类型:
```conf
filter {
mutate {
convert => {
"response" => "integer"
"request_time" => "float"
}
}
}
```
这一步经常被忽略,导致 Elasticsearch 中所有字段都是 keyword 类型,数值范围查询直接失效。
### 删除无用字段
每个事件默认携带 message、@version、host 等字段。如果已经用 Grok 把 message 拆成了独立字段,原始 message 留着只会浪费存储:
```conf
filter {
mutate {
remove_field => ["message", "@version", "host"]
}
}
```
### 替换和追加字段
replace 会覆盖已有字段或新建字段,add_field 则是在原有字段基础上追加:
```conf
filter {
mutate {
replace => { "log_source" => "production-nginx" }
add_field => {
"environment" => "prod"
"pipeline" => "nginx-access"
}
}
}
```
### gsub:正则替换字段内容
Mutate 自带 gsub 操作,可以对字段值做正则替换,不需要动用 Ruby 过滤器:
```conf
filter {
mutate {
gsub => [
"request", "\\?.+$", ""
]
}
}
```
这会把 `/api/users?page=1&size=10` 替换为 `/api/users`,去掉查询参数部分,便于按路径做聚合统计。
### 大小写转换与分割
```conf
filter {
mutate {
uppercase => ["log_level"]
split => { "tags" => "," }
}
}
```
uppercase 将字段值转为大写,split 按指定分隔符将字符串拆成数组。这两个操作在数据规范化场景中很常用。
### Mutate 各操作的执行顺序
Mutate 内部有固定的操作执行顺序,与你在配置中写的顺序无关:`rename` → `copy` → `gsub` → `uppercase`/`lowercase` → `strip` → `replace` → `join` → `split` → `merge` → `coerce` → `convert` → `add_field` → `remove_field`。如果 rename 在 convert 之后才生效,可能让类型转换的目标字段名对不上。遇到这类问题时,可以拆成两个 mutate 块来控制顺序:
```conf
filter {
mutate {
rename => { "resp_code" => "response" }
}
mutate {
convert => { "response" => "integer" }
}
}
```
## Date 过滤器:统一时间戳格式
Logstash 用 `@timestamp` 作为事件的时间基准,但原始日志中的时间格式千差万别。Date 过滤器的作用就是把各种格式的时间字符串解析成 Logstash 内部的 ISO8601 时间对象。
### 解析多种时间格式
```conf
filter {
date {
match => [
"timestamp",
"dd/MMM/yyyy:HH:mm:ss Z",
"yyyy-MM-dd HH:mm:ss",
"ISO8601"
]
}
}
```
match 的第二个参数是格式数组,Joda-Time 格式和 ISO8601 关键字都可以混用。解析成功后,`@timestamp` 自动更新。
### 指定时区和目标字段
```conf
filter {
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss"]
timezone => "Asia/Shanghai"
target => "parsed_time"
}
}
```
如果不指定 target,默认写入 `@timestamp`。如果只想保留解析结果但不动 `@timestamp`,就指定一个自定义的 target 字段。
## GeoIP 过滤器:IP 地址转地理位置
GeoIP 根据 IP 地址查询 MaxMind 数据库,自动补充城市、国家、经纬度等地理信息,是做访问地图可视化的前提。
```conf
filter {
geoip {
source => "client_ip"
target => "geo"
fields => ["city_name", "country_name", "location"]
}
}
```
source 指定待查询的 IP 字段,fields 限制只输出需要的地理字段,避免写入过多无用数据。注意 Logstash 默认内置了 GeoLite2 数据库,但如果需要更精确的数据,需要手动下载并指定 database 路径。
## JSON 过滤器:解析嵌套 JSON 日志
现代应用的日志越来越倾向于直接输出 JSON 格式,JSON 过滤器可以把它展开成 Logstash 的事件字段:
```conf
filter {
json {
source => "message"
target => "parsed"
}
}
```
如果指定了 target,解析结果会放在 target 字段下形成嵌套结构;不指定则直接铺平到顶层。生产中建议指定 target,避免字段名冲突。解析后通常配合 mutate 删除原始 message 字段:
```conf
filter {
json {
source => "message"
target => "log"
}
mutate {
remove_field => ["message"]
}
}
```
## Useragent 过滤器:解析浏览器信息
从 HTTP 请求的 User-Agent 头中提取浏览器名称、版本、操作系统等信息:
```conf
filter {
useragent {
source => "agent"
target => "ua"
}
}
```
通常跟在 Grok 解析 Apache 日志之后使用,`agent` 字段就是 Grok 从日志中提取出来的 User-Agent 字符串。
## CSV 过滤器:处理表格数据
CSV 过滤器用于解析逗号(或其他分隔符)分隔的文本数据:
```conf
filter {
csv {
separator => ","
columns => ["name", "age", "city"]
autodetect_column_types => true
}
}
```
columns 指定每列的字段名,autodetect_column_types 让 Logstash 自动识别数值类型。如果 CSV 首行是表头,也可以省略 columns 让它自动读取。
## Ruby 过滤器:处理复杂逻辑
当内置过滤器无法满足需求时,Ruby 过滤器提供了完全的编程能力:
```conf
filter {
ruby {
code => '
status = event.get("response").to_i
if status >= 400
event.tag("error")
event.set("error_level", status >= 500 ? "server_error" : "client_error")
end
'
}
}
```
Ruby 过滤器灵活但性能开销大,Grok 能搞定的事情不要用 Ruby。实际项目中,Ruby 过滤器多用于多字段联合计算、条件标签打标等场景。
## Drop 过滤器:丢弃不需要的事件
Drop 过滤器直接丢弃整个事件,不会传到输出阶段。常见用法是过滤掉调试日志或特定来源的噪声数据:
```conf
filter {
if [log_level] == "DEBUG" or [message] =~ /^health check/ {
drop { }
}
}
```
使用时注意加上条件判断,否则会丢掉所有事件。
## 过滤器的组合与顺序
实际项目中,过滤器总是组合使用的。一个典型的 Nginx 访问日志处理管线:
```conf
filter {
# 第一步:用 Grok 把日志拆成字段
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# 第二步:转换数值类型
mutate {
convert => { "response" => "integer" }
convert => { "bytes" => "integer" }
remove_field => ["message"]
}
# 第三步:解析时间戳
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
# 第四步:补充地理位置
geoip {
source => "clientip"
}
# 第五步:解析 User-Agent
useragent {
source => "agent"
target => "ua"
}
}
```
过滤器的执行顺序就是配置中的书写顺序。一般遵循"先解析、再转换、后丰富"的原则:Grok 在最前面把原始文本拆开,Mutate 紧跟其后做类型转换和字段清理,Date/GeoIP/Useragent 等根据已有字段做信息补充。
## 日常排错建议
- **Grok 匹配失败**:检查 Kibana Grok Debugger,确认模式与日志格式一致。生产环境务必处理 `_grokparsefailure` 标签事件。
- **Mutate 顺序问题**:记住 Mutate 内部操作有固定执行顺序,不受配置书写位置影响。遇到 rename 和 convert 冲突时,拆成两个 mutate 块。
- **Date 时区偏移**:`@timestamp` 默认是 UTC 时间,查询时注意时区换算。如果业务强依赖本地时间,在 date 过滤器中指定 timezone。
- **GeoIP 数据库过旧**:Logstash 内置的 GeoLite2 不会自动更新,地理信息不准确时需要手动下载最新数据库。
- **性能瓶颈**:Grok 是 CPU 密集型操作,复杂模式会导致吞吐量下降。可以考虑用 dissect 过滤器替代简单格式的 Grok 匹配,dissect 基于分隔符定位,性能更好。服务端5月27日 14:20
如何从零编写一个完整的 Logstash 配置文件?当你在凌晨两点被叫醒,因为日志管道断了——那一刻你会意识到,理解 Logstash 配置文件的每一行到底在做什么,不是锦上添花,而是生存技能。
## Logstash 配置文件的三段式骨架
Logstash 的配置文件本质上只做三件事:从哪读数据、怎么处理数据、往哪写数据。对应的就是 `input`、`filter`、`output` 三个区块,数据像流水线一样依次穿过它们。
```conf
input {
# 数据从哪里来
}
filter {
# 数据怎么加工(可选)
}
output {
# 数据到哪里去
}
```
其中 `input` 和 `output` 是必需的,`filter` 可以省略。三个区块的声明顺序固定为 input-filter-output,但 Logstash 并不强制——只是惯例如此,调换位置也能运行,只是阅读和维护时会非常混乱。
## input:数据的入口
input 插件决定数据源的类型和接入方式。以下是生产环境最常用的四种。
### file 插件——读取本地日志
```conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
tags => ["nginx"]
}
}
```
`start_position` 控制首次读取是从文件头还是文件尾开始;`sincedb_path` 记录已读取的文件偏移量,避免重启后重复消费。如果想在测试时每次都从头读,把 `sincedb_path` 设为 `/dev/null`。
### beats 插件——接收 Elastic Agent 数据
```conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}
```
这是 Elastic Stack 生态中最主流的采集方式。Filebeat、Metricbeat 等轻量采集器将数据推送到这个端口,Logstash 作为中继做进一步加工。
### kafka 插件——消费消息队列
```conf
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["app-logs", "access-logs"]
group_id => "logstash-consumer"
consumer_threads => 4
decorate_events => true
}
}
```
从 Kafka 消费数据适合高吞吐、解耦的场景。`decorate_events` 为 true 时会在事件中添加 Kafka 元数据(topic、partition、offset),便于后续追溯。
### http 插件——接收 HTTP 推送
```conf
input {
http {
port => 8080
codec => json
additional_codecs => { "application/json" => "json" }
}
}
```
适用于应用主动推送 JSON 日志的场景,比如 Webhook 回调或自定义 SDK 上报。
## filter:数据的加工车间
filter 是 Logstash 最有价值的部分,负责把非结构化的原始数据变成可查询的结构化字段。
### grok 插件——正则解析日志
```conf
filter {
grok {
match => {
"message" => "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:status:int} %{NUMBER:duration:float}ms"
}
overwrite => ["message"]
}
}
```
grok 基于正则表达式,但提供了大量预定义模式(如 `%{IP}`、`%{TIMESTAMP_ISO8601}`、`%{GREEDYDATA}`),避免从零写正则。解析失败时会自动添加 `_grokparsefailure` 标签,可以据此做告警。
常用内置模式速查:
| 模式 | 匹配内容 | 示例 |
|------|----------|------|
| `%{IP}` | IPv4/IPv6 地址 | 192.168.1.1 |
| `%{HOSTNAME}` | 主机名 | web-server-01 |
| `%{TIMESTAMP_ISO8601}` | ISO 时间戳 | 2024-01-15T10:30:00 |
| `%{GREEDYDATA}` | 贪婪匹配剩余内容 | 任意字符串 |
| `%{QUOTEDSTRING}` | 带引号字符串 | "hello world" |
### mutate 插件——字段变换
```conf
filter {
mutate {
rename => { "resp_code" => "http_status" }
remove_field => ["headers", "cookies"]
lowercase => ["request_path"]
strip => ["user_input"]
copy => { "source_ip" => "client_ip" }
}
}
```
mutate 做的是"脏活":重命名字段让语义更清晰、删掉无用字段减少存储、大小写转换统一格式。这些操作琐碎但直接影响后续查询体验。
### date 插件——时间戳解析
```conf
filter {
date {
match => ["log_time", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
```
date 插件将日志中的时间字符串解析为 Logstash 事件的标准 `@timestamp` 字段。这一步至关重要——如果没有正确解析时间,Elasticsearch 中的时序查询和索引路由都会出错。注意 `match` 的格式必须与日志中的实际格式完全对应,否则静默失败。
## output:数据的出口
### elasticsearch 插件——写入 ES
```conf
output {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "${ES_PASSWORD}"
ssl => true
ssl_certificate_verification => false
action => "create"
}
}
```
`index` 中的 `%{+YYYY.MM.dd}` 会根据事件的 `@timestamp` 动态生成按天分割的索引名。`action => "create"` 保证相同文档 ID 只写入一次,避免重复。密码等敏感信息用 `${ENV_VAR}` 从环境变量读取,不要硬编码在配置文件里。
### file 插件——落盘归档
```conf
output {
file {
path => "/data/archive/%{type}-%{+YYYY-MM-dd}.log"
codec => line { format => "%{message}" }
}
}
```
适合需要将处理后的数据持久化到文件系统的场景,如审计日志归档、合规数据留存。
### stdout 插件——调试利器
```conf
output {
stdout {
codec => rubydebug
}
}
```
`rubydebug` codec 会以结构化的可读格式输出事件的全部字段,是排查配置问题的第一工具。调试时可以用它替代正式 output,确认 filter 解析结果正确后再切换回去。
## 条件判断:让配置具备分支逻辑
实际场景中,不同来源的日志需要不同的处理路径。Logstash 支持在 filter 和 output 中使用 `if`/`else if`/`else` 条件判断。
```conf
filter {
if [type] == "nginx" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate { remove_field => ["message"] }
} else if [type] == "app" {
json {
source => "message"
}
date {
match => ["timestamp", "ISO8601"]
}
} else {
drop { }
}
}
output {
if "_grokparsefailure" in [tags] {
file {
path => "/var/log/logstash/parse-failed-%{+YYYY-MM-dd}.log"
}
}
elasticsearch {
hosts => ["localhost:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
}
```
条件表达式支持的比较运算符:`==`、`!=`、`<`、`>`、`<=`、`>=`,以及正则匹配 `=~` 和 `!~`。逻辑运算符为 `and`、`or`、`not`。可以用 `[field]` 引用事件字段,`in` 判断数组包含关系。
一个实用模式:将 grok 解析失败的事件单独输出到文件,既不丢数据,又不污染主索引。
## 多管道配置:隔离不同的数据流
当一条 Logstash 实例需要处理多种互不相干的数据流时,用多管道(pipelines)替代单管道内的条件分支会更清晰。
在 `config/pipelines.yml` 中定义:
```yaml
- pipeline.id: nginx-pipeline
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 4
pipeline.batch.size: 250
- pipeline.id: app-pipeline
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 2
pipeline.batch.size: 125
```
每个管道有独立的配置文件、worker 线程数和 batch 大小,互不干扰。如果某个管道的 filter 出了问题,不会拖垮其他管道。
管道之间还可以通过 `pipeline` input/output 插件通信:
```conf
# 管道 A 的 output
output {
pipeline {
send_to => [enrichment]
}
}
# 管道 B 的 input
input {
pipeline {
address => enrichment
}
}
```
## 性能调优的关键参数
配置写对了只是第一步,跑得稳才是生产环境的要求。
### pipeline.workers
默认值是 CPU 核心数。对于 CPU 密集型的 filter(尤其是 grok),不要盲目调大——worker 过多会导致上下文切换开销增大。一般设为 CPU 核数或略低即可。
### pipeline.batch.size
每次批量处理的事件数,默认 125。调大可以提高吞吐量,但会增加内存占用和单次处理延迟。对于 grok 较重的场景,建议从 125 开始逐步调到 250-500 观察效果。
### pipeline.batch.delay
批次等待时间,默认 50ms。降低这个值可以减少延迟,但可能让批次更小、吞吐下降。对延迟敏感的场景可以调到 10-20ms。
### queue.type
默认是内存队列(memory),重启丢数据。生产环境建议用持久化队列(persisted):
```conf
queue.type: persisted
path.queue: /data/logstash/queue
queue.max_bytes: 4gb
```
持久化队列将事件写入磁盘,Logstash 异常退出后可以从断点恢复,代价是吞吐量下降约 10-20%。
### grok 的性能陷阱
grok 是 Logstash 中最耗 CPU 的插件。两个优化方向:
一、将多个 grok match 拆成按条件分支执行,避免每条事件都跑完所有正则:
```conf
filter {
if [type] == "syslog" {
grok { match => { "message" => "%{SYSLOGLINE}" } }
} else if [type] == "apache" {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
}
}
```
二、用 `patterns_dir` 加载自定义模式,将复杂正则拆分成命名片段,既提升可读性也便于缓存复用。
## 一个完整的配置示例
以下是一个涵盖了上述所有要点的生产级配置:
```conf
input {
beats {
port => 5044
type => "beats"
}
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["app-logs"]
group_id => "logstash-consumer"
consumer_threads => 3
decorate_events => true
type => "kafka-app"
}
}
filter {
if [type] == "beats" and "nginx" in [tags] {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
mutate {
remove_field => ["message", "prospector"]
}
} else if [type] == "kafka-app" {
json {
source => "message"
}
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
mutate {
rename => { "lvl" => "log_level" }
lowercase => ["log_level"]
}
}
if "_grokparsefailure" in [tags] or "_jsonparsefailure" in [tags] {
mutate {
add_field => { "parse_error" => "true" }
}
}
}
output {
if [parse_error] == "true" {
file {
path => "/var/log/logstash/failed-%{+YYYY-MM-dd}.log"
codec => line { format => "%{message}" }
}
} else {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
user => "elastic"
password => "${ES_PASSWORD}"
}
}
stdout {
codec => rubydebug
}
}
```
理解 Logstash 配置的关键不是记住多少插件参数,而是建立起 input-filter-output 的思维模型:数据从哪来、到哪去、中间怎么变。在这个框架下,每个插件只是填空题。遇到问题时,按这个顺序排查:数据进来了吗(查 input 日志)?字段解析对了吗(用 stdout + rubydebug 看)?写进目标了吗(查 output 日志和目标系统)?三步定位,比盲目改配置高效得多。