标签

Logstash

Logstash是一个开源的数据收集引擎,主要用于处理和转换各种日志和事件数据。它支持从多种来源(如文件、网络、消息队列和数据库等)收集数据,并将其转换为统一的格式,以便存储、分析和可视化。Logstash提供了大量的插件和过滤器,可以用于数据转换、数据清洗、数据标准化和数据增强等方面。它还支持多种输出,如Elasticsearch、Redis、Kafka和Splunk等。Logstash可以与Elasticsearch和Kibana等开源工具集成,形成一个完整的ELK(Elasticsearch、Logstash和Kibana)堆栈,用于搜索、分析和可视化数据。Logstash适用于多种场景,如安全监控、日志管理、应用性能监控和业务分析等。

Logstash
服务端5月27日 20:20
Logstash 在 ELK Stack 中扮演什么角色,与 Elasticsearch 和 Kibana 如何协作?## 答案 Logstash 是 ELK Stack 的数据采集与处理管道,负责从多种数据源收集日志,经解析、过滤、转换后输出到 Elasticsearch;Elasticsearch 承担索引存储与全文检索;Kibana 提供可视化与交互界面。三者协作:数据源 → Logstash(采集+处理)→ Elasticsearch(存储+检索)→ Kibana(可视化)。 ## Logstash 的核心职责 Logstash 基于 input-filter-output 三段式管道: - **Input**:从文件、syslog、Kafka、Beats 等数据源采集原始日志 - **Filter**:用 Grok 解析非结构化日志为结构化字段,用 Mutate 修改字段,用 Date 标准化时间,用 GeoIP 丰富地理信息 - **Output**:将处理后的数据写入 Elasticsearch,也可同时输出到 Kafka、文件等 配置示例: ``` input { beats { port => 5044 } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } mutate { remove_field => ["tags"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "app-logs" } } ``` ## 三者协作的关键机制 Logstash 与 Elasticsearch 通过 REST API 批量写入,默认按批次刷写提升吞吐。引入 Kafka 或 Redis 作为缓冲层可解耦采集与写入,应对流量突增。Elasticsearch 为 Kibana 提供查询与聚合 API,Kibana 基于此构建仪表板和告警。 当 Logstash 处理能力不足时,可用 Beats 替代其采集角色,Logstash 专注过滤转换,形成 Beats → Logstash → Elasticsearch 的分层架构。Logstash 还支持持久化队列(PQ)防止数据丢失,死信队列(DLQ)捕获处理失败的事件。 ## 追问 **Q: Logstash 与 Fluentd 怎么选?** Logstash 插件生态丰富(200+),适合复杂 ETL;Fluentd 基于 CRuby 更轻量,Kubernetes 环境下常用 Fluentd 替代 Logstash。 **Q: 如何排查 Logstash 管道性能瓶颈?** 用 `--config.test_and_exit` 验证配置,`--log.level debug` 观察事件流,调整 `pipeline.workers`(建议等于 CPU 核数)和 `pipeline.batch.size`,监控队列积压。 **Q: Logstash 如何保证数据不丢失?** 开启持久化队列(`queue.type: persisted`),事件先写磁盘再处理;配合死信队列捕获解析失败的事件,避免静默丢弃。
服务端5月27日 18:26
Logstash 有哪些常用的插件,如何安装和管理插件?Logstash 的强大之处在于它的插件体系——输入、过滤、输出三大类插件覆盖了从数据采集到写入目标的全链路。面试中经常问到"Logstash 有哪些常用插件""怎么安装和管理插件",下面结合实际使用场景梳理清楚。 ## Input 插件:数据从哪里来? Input 插件决定 Logstash 从哪个数据源读取数据。选对 Input 插件是搭建 Pipeline 的第一步。 ### file —— 读日志文件 file 是最基础的 Input 插件,行为类似 `tail -f`,持续读取文件新增内容: ```conf input { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" } } ``` 几个关键参数: - `start_position`:首次读取时从文件开头(`beginning`)还是末尾(`end`)开始,默认 `end` - `sincedb_path`:记录已读取位置的文件路径,重启后从断点续读;设为 `/dev/null` 则每次从头读 - `mode`:默认 `tail` 模式持续追踪,设为 `read` 则读完即退出 ### beats —— 接收 Beats 数据 Beats 是 Elastic 官方的轻量采集器家族(Filebeat、Metricbeat 等),beats 插件是 Logstash 与 Beats 配合的标准方式: ```conf input { beats { port => 5044 } } ``` 生产环境中,Beats 负责在各服务器上采集数据,再统一发送到 Logstash 做集中处理,这是 ELK 架构中最常见的组合。 ### kafka —— 从 Kafka 消费消息 当数据量大、需要缓冲或多个消费者协同工作时,Kafka 是首选的中间层: ```conf input { kafka { bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092" topics => ["app-logs", "business-events"] group_id => "logstash-consumer" consumer_threads => 4 } } ``` ### 其他常用 Input 插件 | 插件 | 典型场景 | |------|---------| | jdbc | 定时从关系型数据库拉取增量数据 | | http | 对外暴露 HTTP 接口,接收外部系统推送的数据 | | tcp / udp | 接收网络协议数据,常用于收集 syslog | | syslog | 专门解析 syslog 格式日志 | | redis | 从 Redis List 或 Channel 读取数据 | | elasticsearch | 从 ES 中查询数据做二次处理 | | s3 | 从 AWS S3 桶读取归档日志 | ## Filter 插件:数据怎么加工? Filter 插件负责把非结构化的原始数据转换成结构化、可搜索的字段。这是 Logstash 最核心的能力。 ### grok —— 解析非结构化日志 grok 是使用频率最高的 Filter 插件,通过正则表达式模式把文本拆解成字段: ```conf filter { grok { match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATH:request_uri} %{NUMBER:response_code:int} %{NUMBER:bytes:int}" } tag_on_failure => ["_grokparsefailure"] } } ``` 关键点: - Logstash 内置了大量命名模式(如 `IP`、`HOSTNAME`、`COMBINEDAPACHELOG`),优先使用内置模式 - `tag_on_failure`:匹配失败时打上标签,方便后续排查未解析的日志 - 性能瓶颈:grok 基于正则,匹配复杂模式时 CPU 开销大,大规模数据场景下可考虑用 dissect 替代 ### mutate —— 字段操作 mutate 用于对字段进行增删改查,是日常配置中使用最频繁的 Filter 之一: ```conf filter { mutate { rename => { "old_field" => "new_field" } remove_field => ["unused_field"] convert => { "response_code" => "integer" "latency" => "float" } gsub => [ "message", "\", "/" ] } } ``` ### date —— 时间戳解析 日志中的时间格式五花八门,date 插件负责将字符串时间解析为 Logstash 事件的时间戳: ```conf filter { date { match => ["log_time", "yyyy-MM-dd HH:mm:ss", "ISO8601"] target => "@timestamp" timezone => "Asia/Shanghai" } } ``` 注意:如果不确定时间格式,可以传多个模式数组,date 插件会依次尝试匹配。 ### 其他常用 Filter 插件 | 插件 | 作用 | |------|------| | json | 解析 JSON 字符串为字段 | | csv | 按分隔符拆分 CSV 格式数据 | | geoip | 根据 IP 查询地理位置 | | useragent | 解析浏览器 User-Agent | | ruby | 用 Ruby 代码实现复杂逻辑(性能敏感场景慎用) | | aggregate | 跨事件聚合,如关联同一个请求的多条日志 | | dissect | 类似 grok 但基于固定分隔符,性能更好 | | drop | 直接丢弃不需要的事件 | | fingerprint | 给事件生成唯一标识 | ## Output 插件:数据送到哪里去? ### elasticsearch —— 写入 Elasticsearch 这是最常用的 Output 插件,生产环境中几乎必用: ```conf output { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "app-logs-%{+YYYY.MM.dd}" template => "/etc/logstash/templates/es-template.json" action => "create" retry_on_conflict => 3 } } ``` 关键参数: - `index`:支持按时间动态生成索引名,`%{+YYYY.MM.dd}` 按天分索引 - `action`:`index`(默认,覆盖写入)或 `create`(仅当文档不存在时写入,防止重复) - `retry_on_conflict`:版本冲突时重试次数 ### kafka —— 写入 Kafka ```conf output { kafka { bootstrap_servers => "kafka-broker1:9092" topic_id => "processed-logs" codec => json compression_type => "snappy" } } ``` ### 其他常用 Output 插件 | 插件 | 场景 | |------|------| | file | 写入本地文件 | | http | 推送到外部 HTTP API | | redis | 写入 Redis | | stdout | 控制台输出,调试时常用 | | email | 触发告警邮件 | | s3 | 归档到 AWS S3 | | mongodb | 写入 MongoDB | ## Codec 插件:数据的编解码 Codec 插件常被忽略,但它影响着数据的序列化方式。常用的是 json 和 multiline: ```conf input { file { path => "/var/log/app.log" codec => multiline { pattern => "^%{TIMESTAMP_ISO8601}" negate => true what => "previous" } } } ``` multiline 用于把 Java 堆栈信息等多行日志合并为一条事件,`pattern` 匹配新日志行的开头,`what => "previous"` 表示不匹配的行归入上一条事件。 ## 插件安装和管理 Logstash 通过 `bin/logstash-plugin` 命令管理插件生命周期: ### 查看已安装插件 ```bash # 列出所有插件 bin/logstash-plugin list # 带版本号 bin/logstash-plugin list --verbose # 按分组查看 bin/logstash-plugin list --group input # 模糊搜索 bin/logstash-plugin list '*kafka*' ``` ### 安装插件 ```bash # 从 RubyGems 安装 bin/logstash-plugin install logstash-output-s3 # 指定版本 bin/logstash-plugin install logstash-output-s3 --version 10.0.0 # 从本地 gem 文件安装 bin/logstash-plugin install /path/to/logstash-output-custom-1.0.0.gem ``` ### 更新和卸载 ```bash # 更新全部插件 bin/logstash-plugin update # 更新指定插件 bin/logstash-plugin update logstash-output-s3 # 卸载插件(Logstash 7.x+ 中部分插件为集成插件,不可卸载) bin/logstash-plugin uninstall logstash-output-s3 ``` ### 离线安装 生产环境通常无法访问外网,需要制作离线安装包: ```bash # 在有网络的机器上生成离线包 bin/logstash-plugin prepare-offline-pack logstash-output-s3 # 在目标机器上安装 bin/logstash-plugin install file:///path/to/logstash-offline-plugins.zip ``` ## 插件配置的实战经验 ### 条件判断让 Pipeline 更高效 不同类型的日志走不同的 Filter 逻辑,避免无关插件浪费算力: ```conf filter { if [type] == "nginx-access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } geoip { source => "clientip" } } else if [type] == "app-error" { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}" } } if [level] == "ERROR" { mutate { add_field => { "alert" => "true" } } } } } ``` ### grok 解析失败的兜底处理 grok 匹配失败是线上常见问题,必须处理: ```conf filter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { mutate { add_field => { "parse_error" => "true" } } } } ``` ### 性能优化要点 - **优先用 dissect 替代 grok**:当日志格式固定时,dissect 基于分隔符拆分,性能比 grok 高出一个量级 - **减少 mutate 调用次数**:合并多个 mutate 操作到一个块内,减少事件反复处理 - **慎用 ruby 插件**:Ruby 代码执行效率远低于内置插件,只在无法用其他插件实现时才用 - **合理配置 pipeline.workers 和 pipeline.batch.size**:workers 数量通常设为 CPU 核心数,batch.size 根据事件大小调整(默认 125,大事件可适当调小) - **关注慢 Filter**:在 `logstash.yml` 中开启 `config.debug` 和慢日志,定位瓶颈插件 ## 自定义插件开发 当内置插件无法满足需求时(比如对接公司内部系统),就需要开发自定义插件。 ### 创建插件骨架 ```bash gem install logstash-plugin-generator logstash-plugin generate --type input --name myinput ``` 生成的目录结构: ``` logstash-input-myinput/ ├── lib/ │ └── logstash/ │ └── inputs/ │ └── myinput.rb ├── spec/ │ └── inputs/ │ └── myinput_spec.rb ├── Gemfile ├── logstash-input-myinput.gemspec └── README.md ``` ### 核心实现 一个 Input 插件至少实现 `register` 和 `run` 两个方法: ```ruby class LogStash::Inputs::Myinput < LogStash::Inputs::Base config_name "myinput" config :host, :validate => :string, :default => "0.0.0.0" config :port, :validate => :number, :required => true def register # 初始化资源,只在启动时调用一次 @server = TCPServer.new(@host, @port) end def run(queue) # 持续运行,产生事件后推入 queue loop do client = @server.accept while line = client.gets event = LogStash::Event.new("message" => line.chomp) decorate(event) queue << event end client.close end end def stop # 优雅关闭 @server.close if @server end end ``` ### 构建与安装 ```bash gem build logstash-input-myinput.gemspec bin/logstash-plugin install logstash-input-myinput-1.0.0.gem ``` ### 编写测试 ```ruby require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/myinput" describe LogStash::Inputs::Myinput do let(:config) { { "port" => 9999 } } it "registers without error" do input = described_class.new(config) expect { input.register }.not_to raise_error end end ``` ## 插件版本管理的注意事项 - **集成插件不可卸载**:Logstash 7.x 之后,Kafka、Beats 等常用插件被合并为集成插件(`logstash-integration-kafka`),无法通过 uninstall 移除 - **锁定版本**:在 `Gemfile` 中指定版本避免升级引入兼容问题:`gem "logstash-output-s3", "~> 10.0"` - **升级策略**:`bin/logstash-plugin update` 默认只升级小版本和补丁版本,不会跨大版本升级,降低破坏性变更风险 - **查看插件版本**:`bin/logstash-plugin list --verbose | grep 插件名` 掌握 Logstash 插件体系的关键在于理解 Input-Filter-Output 的数据流模型,以及每个插件在链路中的定位。日常使用中,grok 和 mutate 是最需要熟练掌握的 Filter 插件,elasticsearch output 是最核心的输出插件,而插件管理命令则保证你能灵活扩展和维护 Pipeline。
服务端5月27日 18:24
Logstash 性能怎么调?从瓶颈定位到参数优化的实战方案Logstash 吞吐量上不去,CPU 打满却处理不完日志,这类问题在生产环境里太常见了。很多团队第一反应是加机器,但多数情况下调对参数就能让现有资源发挥出两三倍的吞吐。 这篇文章从实际踩坑经验出发,讲清楚 Logstash 性能瓶颈怎么定位、各参数怎么调、调了之后有什么效果。读完你会知道:什么时候该调 pipeline.workers,什么时候该加 Kafka 缓冲,G1GC 到底有没有用,以及那些看起来合理但实际拖慢速度的配置。 ## 先定位瓶颈再动手 调优最忌讳盲目改参数。动手之前,先用 Logstash 自带的监控 API 看清楚瓶颈在哪: ```bash curl -s localhost:9600/_node/stats/pipelines | jq '.pipelines.main' ``` 重点关注这几个指标: | 指标 | 含义 | 健康范围 | |------|------|----------| | `events.in` | 每秒摄入事件数 | 接近输入源速率 | | `events.out` | 每秒输出事件数 | 与 `events.in` 基本持平 | | `events.filtered` | 过滤后事件数 | 合理的过滤率 | | `pipeline.workers` 活跃数 | 当前工作线程 | 等于配置值 | | `queue.type` | 队列类型 | memory 或 persisted | 如果 `events.in` 远大于 `events.out`,说明处理速度跟不上摄入速度,瓶颈在 filter 或 output。如果 CPU 使用率低但吞吐上不去,问题可能出在 I/O 等待或网络延迟上。 ## JVM 调优:堆内存和 GC 怎么配 Logstash 跑在 JVM 上,内存配置直接影响性能。在 `config/jvm.options` 里调整: ```bash -Xms4g -Xmx4g -XX:+UseG1GC ``` **堆内存设置原则**: - Xms 和 Xmx 设成一样的值。动态扩缩容会触发 Full GC,导致处理暂停,日志管道会短暂卡顿 - 堆内存不要超过物理内存的 50%。Logstash 本身还需要堆外内存做缓冲区和网络 I/O,堆太大会让操作系统可用内存不足,反而触发 swap - 大多数场景 4-8GB 就够了。超过 8GB 不一定更好——堆越大,GC 扫描的时间越长,G1GC 在 4-8GB 区间表现最好 **G1GC 是否值得开?** 实测下来,G1GC 相比默认的 Parallel GC,在堆内存 4GB 以上时 Full GC 暂停时间从秒级降到百毫秒级。但如果堆只有 2GB,G1GC 的分区管理开销反而可能让吞吐量下降 5%-10%。所以:4GB 以上开 G1GC,2GB 以下用默认的就行。 **一个容易踩的坑**:如果日志里有大量 Grok 解析失败,会产生异常对象堆积在堆里。这时候调大堆只是延缓问题,根本办法是修 Grok 模式或用 `if` 条件跳过不需要解析的日志。 ## Pipeline 参数:workers、batch size、delay 怎么平衡 这三个参数互相影响,单独调一个往往看不到效果。 ### pipeline.workers ```conf # logstash.yml pipeline.workers: 4 ``` 这是处理事件的线程数。默认值是 CPU 核心数,但有个前提:你的 filter 和 output 插件必须是线程安全的。大多数官方插件没问题,但自定义插件需要确认。 实际调法:先设成 CPU 核心数跑基准测试,然后分别试 `核心数/2` 和 `核心数*2`,看哪个 EPS 最高。经验上,filter 重(大量 Grok 正则)的场景设成核心数就行,filter 轻但 output 重(往 ES 写入)的场景可以适当加倍。 ### pipeline.batch.size ```conf pipeline.batch.size: 125 ``` 每个 worker 一次拿多少事件来处理。默认 125 是个保守值。增大 batch size 能减少事件调度开销,提高吞吐量: - **高吞吐场景**(日志量 > 10万/分钟):调到 500-1000 - **低延迟场景**(实时告警):保持 125 或更小 batch size 不是越大越好。过大的 batch 会导致单个批次处理时间变长,增加事件从进入到输出端的端到端延迟。而且如果 filter 里有 Grok 失败的情况,大 batch 会让重试开销放大。 ### pipeline.batch.delay ```conf pipeline.batch.delay: 50 ``` worker 等待多久凑够一个 batch 再开始处理,单位毫秒。默认 50ms。这个参数的意义是:当事件流入速度不够快时,等一等能凑满 batch,减少处理次数。 - 事件流入速度很快:delay 可以降到 10-20ms,减少等待 - 事件流入速度慢但实时性要求高:降到 5ms 甚至 1ms - 事件流入速度慢且不要求实时:保持 50ms,省 CPU **三者联动经验**:高吞吐场景用 `workers=核心数, batch.size=500, batch.delay=10`;低延迟场景用 `workers=4, batch.size=50, batch.delay=5`。改一个参数时保持其他两个不变,观察 EPS 变化,找到拐点。 ## Filter 优化:减少无用功 Filter 是 Logstash 最容易成为瓶颈的环节,尤其是 Grok。 ### Grok 是性能杀手 Grok 底层是正则表达式,每条日志都要跑一遍模式匹配。优化 Grok 的方法: 1. **用 `if` 条件跳过不需要 Grok 的日志**: ```conf filter { if [type] == "nginx_access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } } ``` 这看起来简单,但实际效果可能比调参数还明显。一条不需要 Grok 的日志跳过正则匹配,省下的是毫秒级的 CPU 时间。 2. **自定义模式比组合内置模式快**:内置的 COMBINEDAPACHELOG 实际上是多个小模式拼接的,每次匹配都要逐个尝试。写成一条自定义模式能减少匹配次数: ```conf # 自定义模式文件 NGINX_ACCESS %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes} ``` 3. **把最常匹配的模式放前面**:Grok 是按顺序尝试匹配的,最可能的模式放第一个能最快命中。 4. **用 dissect 替代简单格式的 Grok**:如果日志格式是固定的分隔符(如管道符、逗号分隔),dissect 插件用分隔符切分,比正则匹配快 5-10 倍: ```conf filter { dissect { mapping => { "message" => "%{ip} | %{user} | %{action}" } } } ``` ### 其他 Filter 优化 - **mutate 的 remove_field**:尽早删掉不需要的字段,减少后续处理的数据量 - **用 drop 过滤器丢弃无用事件**:在 filter 链最前面丢弃,比处理完再丢省很多资源 - **避免重复解析**:如果上游已经做过 JSON 解析,不要再用 json filter 再解析一遍 ## Output 优化:往 ES 写数据的讲究 ES output 是最常见的瓶颈之一。 ### 批量写入参数 ```conf output { elasticsearch { hosts => ["http://es-cluster:9200"] http_compression => true } } ``` 注意:旧版本的 `flush_size` 和 `idle_flush_time` 参数已经在 7.x 之后废弃,改由 pipeline 的 batch size 和 batch delay 统一控制。如果你还在用这两个参数,升级后删掉,否则会有告警。 **http_compression => true** 这个一定要开。压缩后网络传输量减少 60%-80%,对跨机房写入场景效果尤其明显,CPU 开销可以忽略。 ### 连接池调优 如果 ES 集群有多个节点,Logstash 会自动轮询写入。但默认连接池大小可能不够,高并发场景下可以在 ES output 里显式配置: ```conf output { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200", "http://es-node3:9200"] http_compression => true # 新版本支持的批量操作参数 action => "index" } } ``` ## 持久队列:防数据丢失的最后防线 ```conf # logstash.yml queue.type: persisted path.queue: /data/logstash/queue queue.page_capacity: 250mb queue.max_events: 0 queue.max_bytes: 4gb ``` 持久队列把事件写到磁盘,Logstash 重启或崩溃时不丢数据。代价是吞吐量下降 10%-20%,因为每次事件要写磁盘。 什么场景该开持久队列: - 数据不能丢(金融日志、审计日志) - 下游 ES 不稳定,偶尔写入失败 - Logstash 重启频繁 什么场景可以不开: - 日志允许少量丢失(纯分析用途的访问日志) - 下游写入非常稳定 - 对吞吐量有极致要求 ## 架构层面:加缓冲和水平扩展 单机调优总有上限。当一台 Logstash 处理不过来,架构上的调整比继续压单机更有效。 ### 加 Kafka 缓冲 ``` Filebeat → Kafka → Logstash → Elasticsearch ``` Kafka 在中间起两个作用:一是缓冲突发流量,Logstash 处理不过来时 Kafka 先存着;二是解耦,上游采集和下游处理互不影响。 Kafka 场景下的 Logstash 配置要点: ```conf input { kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["nginx-logs"] group_id => "logstash-nginx" consumer_threads => 4 auto_offset_reset => "earliest" } } ``` `consumer_threads` 建议设成 Kafka 分区数。如果分区数是 12,设 12 个 consumer 线程能充分利用并行消费。 ### 多实例水平扩展 起多个 Logstash 实例,用负载均衡或 Kafka consumer group 分流: - 如果输入源是 Kafka:每个 Logstash 实例配相同的 `group_id`,Kafka 自动分配分区给不同实例 - 如果输入源是 Beats:在 Beats 和 Logstash 之间加一层 Nginx 或 HAProxy 做 TCP 负载均衡 ### 用 Beats 替代 Logstash 做采集 Filebeat、Metricbeat 比 Logstash 轻量得多,资源占用大约是 Logstash 的 1/10。架构上让 Beats 做采集、Logstash 做处理,比让 Logstash 又采集又处理高效得多。 ## 怎么验证优化效果 每次只改一个参数,跑一轮基准测试对比。用 Logstash 自带的 generator 输入插件做压测: ```conf input { generator { lines => ["192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /api/users HTTP/1.1\" 200 2326"] count => 1000000 } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } output { stdout { codec => dots } } ``` 跑完后看输出的时间,算出 EPS(每秒处理事件数)。把这个值作为基准,改一个参数再跑,对比变化。 生产环境监控关键指标:`events.in` 和 `events.out` 的差值(积压量)、JVM 堆使用率、GC 频率和耗时。如果堆使用率持续超过 75% 或 Full GC 频率超过每分钟一次,说明要么堆太小,要么 filter 有内存泄漏。 调优没有一劳永逸的方案。日志格式变了、流量模式变了、ES 集群扩容了,都可能让之前的调优配置不再最优。养成定期看监控、定期跑基准测试的习惯,比任何单次调优都重要。
服务端5月27日 18:22
Logstash 有哪些常用过滤器?Grok 和 Mutate 怎么用?## Grok 过滤器:把非结构化日志变成结构化数据 Grok 是 Logstash 中使用频率最高的过滤器,核心能力是将一行纯文本日志拆解成有名字段的 JSON。它的底层原理是基于正则表达式的模式匹配,但 Elastic 已经预置了大量常用模式,日常使用不需要手写正则。 ### 基本匹配 最典型的场景是解析 Apache/Nginx 访问日志。`COMBINEDAPACHELOG` 是内置模式,一条配置就能提取 clientip、response_code、bytes 等十几个字段: ```conf filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } ``` 匹配成功后,原来 message 字段中的一整行日志会被拆成 `clientip`、`ident`、`auth`、`timestamp`、`verb`、`request`、`httpversion`、`response`、`bytes` 等独立字段,后续过滤器和输出插件都能直接引用。 ### 多模式备选 实际生产中,日志格式往往不止一种。Grok 支持传入一个模式数组,按顺序依次尝试匹配,命中的第一个生效: ```conf filter { grok { match => { "message" => [ "%{COMBINEDAPACHELOG}", "%{COMMONAPACHELOG}", "%{SYSLOGBASE} %{GREEDYDATA:message}" ] } } } ``` 这种方式比写一个超长的"万能正则"更易维护,哪条模式匹配了也更容易排查。 ### 自定义模式 当内置模式无法满足需求时,可以在外部文件中定义自己的模式。模式文件的语法是 `PATTERN_NAME regex`,一行一个: ```conf filter { grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => "%{MYAPP_LOG:myapp_field}" } } } ``` 对应的 `/etc/logstash/patterns/myapp` 文件内容示例: ``` MYAPP_LOG \[%{TIMESTAMP_ISO8601:timestamp}\] \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg} ``` ### Grok 匹配失败怎么办 Grok 匹配失败时,Logstash 会自动给事件打上 `_grokparsefailure` 标签。在生产环境中,应该用条件判断捕获这些失败事件,避免脏数据静默进入 Elasticsearch: ```conf filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } if "_grokparsefailure" in [tags] { mutate { add_field => { "parse_error" => "grok failed for message" } } } } ``` 也可以借助 Kibana 自带的 Grok Debugger 工具调试模式,避免反复重启 Logstash。 ## Mutate 过滤器:字段级别的加工工具 Mutate 是"万能修理工",几乎所有的字段增删改操作都能靠它完成。它不关心数据来源,只对已有字段做变换。 ### 重命名字段 从 Beats 或其他输入源拿到的字段名不符合规范时,用 rename 统一命名: ```conf filter { mutate { rename => { "client_ip" => "source_ip" } } } ``` ### 类型转换 Grok 解析出来的字段默认都是字符串类型,想做数值聚合或范围查询,必须先转换类型: ```conf filter { mutate { convert => { "response" => "integer" "request_time" => "float" } } } ``` 这一步经常被忽略,导致 Elasticsearch 中所有字段都是 keyword 类型,数值范围查询直接失效。 ### 删除无用字段 每个事件默认携带 message、@version、host 等字段。如果已经用 Grok 把 message 拆成了独立字段,原始 message 留着只会浪费存储: ```conf filter { mutate { remove_field => ["message", "@version", "host"] } } ``` ### 替换和追加字段 replace 会覆盖已有字段或新建字段,add_field 则是在原有字段基础上追加: ```conf filter { mutate { replace => { "log_source" => "production-nginx" } add_field => { "environment" => "prod" "pipeline" => "nginx-access" } } } ``` ### gsub:正则替换字段内容 Mutate 自带 gsub 操作,可以对字段值做正则替换,不需要动用 Ruby 过滤器: ```conf filter { mutate { gsub => [ "request", "\\?.+$", "" ] } } ``` 这会把 `/api/users?page=1&size=10` 替换为 `/api/users`,去掉查询参数部分,便于按路径做聚合统计。 ### 大小写转换与分割 ```conf filter { mutate { uppercase => ["log_level"] split => { "tags" => "," } } } ``` uppercase 将字段值转为大写,split 按指定分隔符将字符串拆成数组。这两个操作在数据规范化场景中很常用。 ### Mutate 各操作的执行顺序 Mutate 内部有固定的操作执行顺序,与你在配置中写的顺序无关:`rename` → `copy` → `gsub` → `uppercase`/`lowercase` → `strip` → `replace` → `join` → `split` → `merge` → `coerce` → `convert` → `add_field` → `remove_field`。如果 rename 在 convert 之后才生效,可能让类型转换的目标字段名对不上。遇到这类问题时,可以拆成两个 mutate 块来控制顺序: ```conf filter { mutate { rename => { "resp_code" => "response" } } mutate { convert => { "response" => "integer" } } } ``` ## Date 过滤器:统一时间戳格式 Logstash 用 `@timestamp` 作为事件的时间基准,但原始日志中的时间格式千差万别。Date 过滤器的作用就是把各种格式的时间字符串解析成 Logstash 内部的 ISO8601 时间对象。 ### 解析多种时间格式 ```conf filter { date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "yyyy-MM-dd HH:mm:ss", "ISO8601" ] } } ``` match 的第二个参数是格式数组,Joda-Time 格式和 ISO8601 关键字都可以混用。解析成功后,`@timestamp` 自动更新。 ### 指定时区和目标字段 ```conf filter { date { match => ["log_time", "yyyy-MM-dd HH:mm:ss"] timezone => "Asia/Shanghai" target => "parsed_time" } } ``` 如果不指定 target,默认写入 `@timestamp`。如果只想保留解析结果但不动 `@timestamp`,就指定一个自定义的 target 字段。 ## GeoIP 过滤器:IP 地址转地理位置 GeoIP 根据 IP 地址查询 MaxMind 数据库,自动补充城市、国家、经纬度等地理信息,是做访问地图可视化的前提。 ```conf filter { geoip { source => "client_ip" target => "geo" fields => ["city_name", "country_name", "location"] } } ``` source 指定待查询的 IP 字段,fields 限制只输出需要的地理字段,避免写入过多无用数据。注意 Logstash 默认内置了 GeoLite2 数据库,但如果需要更精确的数据,需要手动下载并指定 database 路径。 ## JSON 过滤器:解析嵌套 JSON 日志 现代应用的日志越来越倾向于直接输出 JSON 格式,JSON 过滤器可以把它展开成 Logstash 的事件字段: ```conf filter { json { source => "message" target => "parsed" } } ``` 如果指定了 target,解析结果会放在 target 字段下形成嵌套结构;不指定则直接铺平到顶层。生产中建议指定 target,避免字段名冲突。解析后通常配合 mutate 删除原始 message 字段: ```conf filter { json { source => "message" target => "log" } mutate { remove_field => ["message"] } } ``` ## Useragent 过滤器:解析浏览器信息 从 HTTP 请求的 User-Agent 头中提取浏览器名称、版本、操作系统等信息: ```conf filter { useragent { source => "agent" target => "ua" } } ``` 通常跟在 Grok 解析 Apache 日志之后使用,`agent` 字段就是 Grok 从日志中提取出来的 User-Agent 字符串。 ## CSV 过滤器:处理表格数据 CSV 过滤器用于解析逗号(或其他分隔符)分隔的文本数据: ```conf filter { csv { separator => "," columns => ["name", "age", "city"] autodetect_column_types => true } } ``` columns 指定每列的字段名,autodetect_column_types 让 Logstash 自动识别数值类型。如果 CSV 首行是表头,也可以省略 columns 让它自动读取。 ## Ruby 过滤器:处理复杂逻辑 当内置过滤器无法满足需求时,Ruby 过滤器提供了完全的编程能力: ```conf filter { ruby { code => ' status = event.get("response").to_i if status >= 400 event.tag("error") event.set("error_level", status >= 500 ? "server_error" : "client_error") end ' } } ``` Ruby 过滤器灵活但性能开销大,Grok 能搞定的事情不要用 Ruby。实际项目中,Ruby 过滤器多用于多字段联合计算、条件标签打标等场景。 ## Drop 过滤器:丢弃不需要的事件 Drop 过滤器直接丢弃整个事件,不会传到输出阶段。常见用法是过滤掉调试日志或特定来源的噪声数据: ```conf filter { if [log_level] == "DEBUG" or [message] =~ /^health check/ { drop { } } } ``` 使用时注意加上条件判断,否则会丢掉所有事件。 ## 过滤器的组合与顺序 实际项目中,过滤器总是组合使用的。一个典型的 Nginx 访问日志处理管线: ```conf filter { # 第一步:用 Grok 把日志拆成字段 grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } # 第二步:转换数值类型 mutate { convert => { "response" => "integer" } convert => { "bytes" => "integer" } remove_field => ["message"] } # 第三步:解析时间戳 date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } # 第四步:补充地理位置 geoip { source => "clientip" } # 第五步:解析 User-Agent useragent { source => "agent" target => "ua" } } ``` 过滤器的执行顺序就是配置中的书写顺序。一般遵循"先解析、再转换、后丰富"的原则:Grok 在最前面把原始文本拆开,Mutate 紧跟其后做类型转换和字段清理,Date/GeoIP/Useragent 等根据已有字段做信息补充。 ## 日常排错建议 - **Grok 匹配失败**:检查 Kibana Grok Debugger,确认模式与日志格式一致。生产环境务必处理 `_grokparsefailure` 标签事件。 - **Mutate 顺序问题**:记住 Mutate 内部操作有固定执行顺序,不受配置书写位置影响。遇到 rename 和 convert 冲突时,拆成两个 mutate 块。 - **Date 时区偏移**:`@timestamp` 默认是 UTC 时间,查询时注意时区换算。如果业务强依赖本地时间,在 date 过滤器中指定 timezone。 - **GeoIP 数据库过旧**:Logstash 内置的 GeoLite2 不会自动更新,地理信息不准确时需要手动下载最新数据库。 - **性能瓶颈**:Grok 是 CPU 密集型操作,复杂模式会导致吞吐量下降。可以考虑用 dissect 过滤器替代简单格式的 Grok 匹配,dissect 基于分隔符定位,性能更好。
服务端5月27日 14:20
如何从零编写一个完整的 Logstash 配置文件?当你在凌晨两点被叫醒,因为日志管道断了——那一刻你会意识到,理解 Logstash 配置文件的每一行到底在做什么,不是锦上添花,而是生存技能。 ## Logstash 配置文件的三段式骨架 Logstash 的配置文件本质上只做三件事:从哪读数据、怎么处理数据、往哪写数据。对应的就是 `input`、`filter`、`output` 三个区块,数据像流水线一样依次穿过它们。 ```conf input { # 数据从哪里来 } filter { # 数据怎么加工(可选) } output { # 数据到哪里去 } ``` 其中 `input` 和 `output` 是必需的,`filter` 可以省略。三个区块的声明顺序固定为 input-filter-output,但 Logstash 并不强制——只是惯例如此,调换位置也能运行,只是阅读和维护时会非常混乱。 ## input:数据的入口 input 插件决定数据源的类型和接入方式。以下是生产环境最常用的四种。 ### file 插件——读取本地日志 ```conf input { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" tags => ["nginx"] } } ``` `start_position` 控制首次读取是从文件头还是文件尾开始;`sincedb_path` 记录已读取的文件偏移量,避免重启后重复消费。如果想在测试时每次都从头读,把 `sincedb_path` 设为 `/dev/null`。 ### beats 插件——接收 Elastic Agent 数据 ```conf input { beats { port => 5044 ssl => true ssl_certificate => "/etc/logstash/certs/logstash.crt" ssl_key => "/etc/logstash/certs/logstash.key" } } ``` 这是 Elastic Stack 生态中最主流的采集方式。Filebeat、Metricbeat 等轻量采集器将数据推送到这个端口,Logstash 作为中继做进一步加工。 ### kafka 插件——消费消息队列 ```conf input { kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["app-logs", "access-logs"] group_id => "logstash-consumer" consumer_threads => 4 decorate_events => true } } ``` 从 Kafka 消费数据适合高吞吐、解耦的场景。`decorate_events` 为 true 时会在事件中添加 Kafka 元数据(topic、partition、offset),便于后续追溯。 ### http 插件——接收 HTTP 推送 ```conf input { http { port => 8080 codec => json additional_codecs => { "application/json" => "json" } } } ``` 适用于应用主动推送 JSON 日志的场景,比如 Webhook 回调或自定义 SDK 上报。 ## filter:数据的加工车间 filter 是 Logstash 最有价值的部分,负责把非结构化的原始数据变成可查询的结构化字段。 ### grok 插件——正则解析日志 ```conf filter { grok { match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:status:int} %{NUMBER:duration:float}ms" } overwrite => ["message"] } } ``` grok 基于正则表达式,但提供了大量预定义模式(如 `%{IP}`、`%{TIMESTAMP_ISO8601}`、`%{GREEDYDATA}`),避免从零写正则。解析失败时会自动添加 `_grokparsefailure` 标签,可以据此做告警。 常用内置模式速查: | 模式 | 匹配内容 | 示例 | |------|----------|------| | `%{IP}` | IPv4/IPv6 地址 | 192.168.1.1 | | `%{HOSTNAME}` | 主机名 | web-server-01 | | `%{TIMESTAMP_ISO8601}` | ISO 时间戳 | 2024-01-15T10:30:00 | | `%{GREEDYDATA}` | 贪婪匹配剩余内容 | 任意字符串 | | `%{QUOTEDSTRING}` | 带引号字符串 | "hello world" | ### mutate 插件——字段变换 ```conf filter { mutate { rename => { "resp_code" => "http_status" } remove_field => ["headers", "cookies"] lowercase => ["request_path"] strip => ["user_input"] copy => { "source_ip" => "client_ip" } } } ``` mutate 做的是"脏活":重命名字段让语义更清晰、删掉无用字段减少存储、大小写转换统一格式。这些操作琐碎但直接影响后续查询体验。 ### date 插件——时间戳解析 ```conf filter { date { match => ["log_time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" timezone => "Asia/Shanghai" } } ``` date 插件将日志中的时间字符串解析为 Logstash 事件的标准 `@timestamp` 字段。这一步至关重要——如果没有正确解析时间,Elasticsearch 中的时序查询和索引路由都会出错。注意 `match` 的格式必须与日志中的实际格式完全对应,否则静默失败。 ## output:数据的出口 ### elasticsearch 插件——写入 ES ```conf output { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "app-logs-%{+YYYY.MM.dd}" user => "elastic" password => "${ES_PASSWORD}" ssl => true ssl_certificate_verification => false action => "create" } } ``` `index` 中的 `%{+YYYY.MM.dd}` 会根据事件的 `@timestamp` 动态生成按天分割的索引名。`action => "create"` 保证相同文档 ID 只写入一次,避免重复。密码等敏感信息用 `${ENV_VAR}` 从环境变量读取,不要硬编码在配置文件里。 ### file 插件——落盘归档 ```conf output { file { path => "/data/archive/%{type}-%{+YYYY-MM-dd}.log" codec => line { format => "%{message}" } } } ``` 适合需要将处理后的数据持久化到文件系统的场景,如审计日志归档、合规数据留存。 ### stdout 插件——调试利器 ```conf output { stdout { codec => rubydebug } } ``` `rubydebug` codec 会以结构化的可读格式输出事件的全部字段,是排查配置问题的第一工具。调试时可以用它替代正式 output,确认 filter 解析结果正确后再切换回去。 ## 条件判断:让配置具备分支逻辑 实际场景中,不同来源的日志需要不同的处理路径。Logstash 支持在 filter 和 output 中使用 `if`/`else if`/`else` 条件判断。 ```conf filter { if [type] == "nginx" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } mutate { remove_field => ["message"] } } else if [type] == "app" { json { source => "message" } date { match => ["timestamp", "ISO8601"] } } else { drop { } } } output { if "_grokparsefailure" in [tags] { file { path => "/var/log/logstash/parse-failed-%{+YYYY-MM-dd}.log" } } elasticsearch { hosts => ["localhost:9200"] index => "%{type}-%{+YYYY.MM.dd}" } } ``` 条件表达式支持的比较运算符:`==`、`!=`、`<`、`>`、`<=`、`>=`,以及正则匹配 `=~` 和 `!~`。逻辑运算符为 `and`、`or`、`not`。可以用 `[field]` 引用事件字段,`in` 判断数组包含关系。 一个实用模式:将 grok 解析失败的事件单独输出到文件,既不丢数据,又不污染主索引。 ## 多管道配置:隔离不同的数据流 当一条 Logstash 实例需要处理多种互不相干的数据流时,用多管道(pipelines)替代单管道内的条件分支会更清晰。 在 `config/pipelines.yml` 中定义: ```yaml - pipeline.id: nginx-pipeline path.config: "/etc/logstash/conf.d/nginx.conf" pipeline.workers: 4 pipeline.batch.size: 250 - pipeline.id: app-pipeline path.config: "/etc/logstash/conf.d/app.conf" pipeline.workers: 2 pipeline.batch.size: 125 ``` 每个管道有独立的配置文件、worker 线程数和 batch 大小,互不干扰。如果某个管道的 filter 出了问题,不会拖垮其他管道。 管道之间还可以通过 `pipeline` input/output 插件通信: ```conf # 管道 A 的 output output { pipeline { send_to => [enrichment] } } # 管道 B 的 input input { pipeline { address => enrichment } } ``` ## 性能调优的关键参数 配置写对了只是第一步,跑得稳才是生产环境的要求。 ### pipeline.workers 默认值是 CPU 核心数。对于 CPU 密集型的 filter(尤其是 grok),不要盲目调大——worker 过多会导致上下文切换开销增大。一般设为 CPU 核数或略低即可。 ### pipeline.batch.size 每次批量处理的事件数,默认 125。调大可以提高吞吐量,但会增加内存占用和单次处理延迟。对于 grok 较重的场景,建议从 125 开始逐步调到 250-500 观察效果。 ### pipeline.batch.delay 批次等待时间,默认 50ms。降低这个值可以减少延迟,但可能让批次更小、吞吐下降。对延迟敏感的场景可以调到 10-20ms。 ### queue.type 默认是内存队列(memory),重启丢数据。生产环境建议用持久化队列(persisted): ```conf queue.type: persisted path.queue: /data/logstash/queue queue.max_bytes: 4gb ``` 持久化队列将事件写入磁盘,Logstash 异常退出后可以从断点恢复,代价是吞吐量下降约 10-20%。 ### grok 的性能陷阱 grok 是 Logstash 中最耗 CPU 的插件。两个优化方向: 一、将多个 grok match 拆成按条件分支执行,避免每条事件都跑完所有正则: ```conf filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGLINE}" } } } else if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } } ``` 二、用 `patterns_dir` 加载自定义模式,将复杂正则拆分成命名片段,既提升可读性也便于缓存复用。 ## 一个完整的配置示例 以下是一个涵盖了上述所有要点的生产级配置: ```conf input { beats { port => 5044 type => "beats" } kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["app-logs"] group_id => "logstash-consumer" consumer_threads => 3 decorate_events => true type => "kafka-app" } } filter { if [type] == "beats" and "nginx" in [tags] { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } mutate { remove_field => ["message", "prospector"] } } else if [type] == "kafka-app" { json { source => "message" } date { match => ["timestamp", "ISO8601"] target => "@timestamp" } mutate { rename => { "lvl" => "log_level" } lowercase => ["log_level"] } } if "_grokparsefailure" in [tags] or "_jsonparsefailure" in [tags] { mutate { add_field => { "parse_error" => "true" } } } } output { if [parse_error] == "true" { file { path => "/var/log/logstash/failed-%{+YYYY-MM-dd}.log" codec => line { format => "%{message}" } } } else { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "%{type}-%{+YYYY.MM.dd}" user => "elastic" password => "${ES_PASSWORD}" } } stdout { codec => rubydebug } } ``` 理解 Logstash 配置的关键不是记住多少插件参数,而是建立起 input-filter-output 的思维模型:数据从哪来、到哪去、中间怎么变。在这个框架下,每个插件只是填空题。遇到问题时,按这个顺序排查:数据进来了吗(查 input 日志)?字段解析对了吗(用 stdout + rubydebug 看)?写进目标了吗(查 output 日志和目标系统)?三步定位,比盲目改配置高效得多。