fluentd配置文件详解
fluentd配置文件详解
fluentd是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和HTTP请求内容。
配置:
source:定义输入,数据的来源,input方向。
match:定义输出,下一步的去向,如写入文件,或者发送到指定软件存储。output方向。
filter:定义过滤,也即事件处理流水线,一般在输入和输出之间运行,可减少字段,也可丰富信息。
system:系统级别的设置,如日志级别、进程名称。
label:定义一组操作,从而实现复用和内部路由。
@include:引入其他文件,和Java、python的import类似。
输出调试:
1 | <source> |
收集nginx日志配置:
1 | <source> |
数据被收集后按照用户配置的解析规则,形成一系列event。每一个event包含如下内容:
1 | tag = xxx |
其中:
- tag:为数据流的标记。fluentd中可以具有多个数据源,解析器,过滤器和数据输出。他们之前使用tag来对应。类似于数据流按照tag分组。数据流向下游的时候只会进入tag相匹配的处理器。
- time:event产生的时间,该字段通常由日志内的时间字段解析出来。
- record:日志的内容,为JSON格式。
fluentd支持多种数据的解析过滤和输出操作。其中常用的有:
- tail输入:增量读取日志文件作为数据源,支持日志滚动。
- exec输入:定时执行命令,获取输出解析后作为数据源。
- syslog输出:解析标准的syslog日志作为输入。
- forward输入:接收其他fluentd转发来的数据作为数据源。
- dummy:虚拟数据源,可以定时产生假数据,用于测试。
- regexp解析器:使用正则表达式命名分组的方式提取出日志内容为JSON字段。
- record_transformer过滤器:人为修改record内的字段。
- file输出:用于将event落地为日志文件。
- stdout:将event输出到stdout。如果fluentd以daemon方式运行,输出到fluentd的运行日志中。
- forward:转发event到其他fluentd节点。
- copy:多路输出,复制event到多个输出端。
- kafka:输出event到Kafka。
- webhdfs:输出event到HDFS。
- elasticsearch:输出event到HDFS。
可以安装并启动fluentd。
配置文件位置
编辑fluentd配置文件的方法:
1 | vi /etc/td-agent/td-agent.conf |
修改运行用户和组
默认来说fluentd使用td-agent用户启动。如果需要修改fluentd的用户,需要执行:
1 | vi /usr/lib/systemd/system/td-agent.service |
文件内容如下所示:
1 | [Unit] |
修改Service
部分User
和Group
配置项可以更改fluentd进程的用户和组。
检测配置文件是否正确的方法
在shell中运行:
1 | /opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf |
观察输出,如果有错误会给出对应提示。
数据流逻辑
fluentd以tag值为基准,决定数据的流经哪些处理器。
数据的流向为:source -> parser -> filter -> output
input配置
tail
增量读取日志文件。需要提供一个用于标记已经读取到位置的文件(position file)所在的路径。
tail针对日志滚动的支持:
tail方式采用跟踪文件inode的方式进行。比如日志名为app.log
,如果日志发生滚动,被重命名为app.log.1
。文件重命名的时候inode是不会改变的。因此发生滚动时写入到旧文件末尾的日志也可以被收集到。tail会跟踪旧文件的inode一段时间(rotate_wait
配置),这段时间过去之后,tail不再监听app.log.1
,开始监听新的app.log
文件。
tail方式的示例配置:
1 | <source> |
注意:如果文件发生修改会输出全量文件内容。
配置项解释
tag:数据源的tag值。*
号可以扩展为path(/
替换为.
)。例如
1 | path /path/to/file |
tag会被扩展为foo.path.to.file
path:配置读取的路径。可以使用*
或者是strftime
。例如:
1 | path /path/to/%Y/%m/%d/* |
如果今天是2020年1月2日,fluentd会读取/path/to/2020/01/02
目录下的内容。
也可以配置多个路径,使用逗号分隔:
1 | path /path/to/a/*,/path/to/b/c.log |
exclude_path:排除部分目录或文件,使用数组格式配置。
1 | path /path/to/* |
refresh_interval:多长时间刷新一次文件监听列表,配合*
使用才有意义。
pos_file:位置文件地址。这个文件保存了监听的日志文件已经读取到第几行。该项一定要配置。
注意,不要再多个source之间共用pos file,否则会出现问题。
pos_file_compaction_interval:pos file文件压缩时间间隔。用于压缩pos file中不再监听的记录,不可解析的记录以及重复的记录。
parse标签:用于指定log的解析器(必须的配置项)。
例如:
1 | # json |
path_key:如果配置此项,监控文件的path会在event中,此项的key为path_key
。
例如:
1 | path /path/to/access.log |
生成的数据如下所示:
1 | {"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"} |
rotate_wait:日志发生滚动的时候,可能会有部分日志仍然输出在旧的日志文件,此时需要保持监听旧日志文件一段时间,这个时间配置就是rotate_wait
。
exec
周期性执行命令,抽取命令输出为event。
示例配置:
1 | <source> |
以上命令的含义为每10秒钟执行cmd arg arg
命令,提取命令执行结果,以空白字符分隔三个字段的值为k1,k2,k3。其中k1的值作为tag,k2作为时间字段,使用%Y-%m-%d %H:%M:%S
格式。
一个例子,周期获取系统的平均负载。配置方法如下:
1 | <source> |
输出的日志格式为:
1 | 2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"} |
syslog
连接rsyslog。可以作为rsyslog的接收端。
一个配置的例子:
1 | <source> |
fluentd打开5140端口监听rsyslog发来的log。
rsyslog配置文件/etc/rsyslog.conf
设置为:
1 | # Send log messages to Fluentd |
fluentd解析到的event格式如下:
1 | tag = "#{@tag}.#{facility}.#{priority}" |
dummy
专用于测试的数据源。周期产生假数据。
配置举例:
1 | <source> |
dummy常用参数:
- tag: 标记值
- size:每次发送的event数量
- rate:每秒产生多少个event
- auto_increment_key:自增键名。如果配置了此项,会有一个key为该配置项值的自增键
- suspend:重启后自增值是否重新开始
- dummy:测试数据内容
forward
用于接收其他fluentd forward过来的event。
示例配置:
1 | <source> |
output配置
file
输出event为文件。默认每天输出一个日志文件。
示例配置:
1 | <match pattern> |
包含的参数类型:
- path:path支持placeholder,可以在日志路径中嵌入时间,tag和record中的字段值。例如:
1 | path /path/to/${tag}/${key1}/file.%Y%m%d |
注意:buffer标签后面的内容为buffer chunk key。Buffer根据这些key分段。
- append:flush的chuck是否追加到已存在的文件后。默认为false,便于文件的并行处理。
- format标签,用来规定文件内容的格式,默认值为out_file。
- inject标签,用来为event增加time和tag等字段。
- add_path_suffix:是否增加path后缀
- path_suffix:path后缀内容,默认为
.log
。 - compress:采用什么压缩格式,默认不压缩。
- recompress:是否在buffer chunk已经压缩的情况再次压缩,默认为false。
forward
将event转发到其他的fluentd节点。如果配置了多个fluentd节点,会使用负载均衡和支持容错的方式发送。如果需要发送多份数据,需要使用copy。
配置示例:
1 | <match pattern> |
server标签内可以配置如下字段:
- host
- name
- port
- shared_key
- username
- password
- standby 标记server为备用,只有其他node不可用的时候才会启用standby的node
- weight 负载均衡的权重配置
copy
多路输出(复制event到多个输出端)
示例配置
1 | <match pattern> |
其中每一个store是一路输出。
重要参数:
- copy_mode:复制模式。可选值有
- no_copy:每路输出共享event。
- shallow:浅拷贝,如果不修改嵌套字段可以使用。
- deep:深拷贝,使用
msgpack-ruby
方式。 - marshal:深拷贝,使用
marshal
方式。
- store标签的ignore_error参数:如果被标记ignore_error的store出现错误,不会影响其他的store。官网的例子为:
1 | <match app.**> |
假如plugin1出现错误,plugin2也不会执行。如果在plugin1的store添加上ignore_error参数,如下所示:
1 | <match app.**> |
上述情况plugin2的运行不受影响。通常为不重要的store添加ignore_error参数。
http
通过http请求的方式发送event。
payload的格式由format标签决定。
示例配置:
1 | <match pattern> |
该例子使用http方式将event发送到http://logserver.com:9000/api
,使用post方式,连接超时时间为2秒。输出格式为json,每10秒钟输出一次。
注意:
如果使用JSON的方式发送,HTTP请求的content-type为application/x-ndjson (newline-delimited JSONs)。如果用spring mvc接收会提示不支持。可以使用HTTPServletRequest
接收request body。
stdout
标准输出的模式,如果使用后台模式运行fluentd,输出到fluentd的日志。多用于debug的时候。
配置方法:
1 | <match pattern> |
elasticsearch
输出event到elasticsearch。
示例配置:
1 | <match my.logs> |
可选参数:
- host:单个elasticsearch节点地址
- port:单个elasticsearch节点的端口号
- hosts:elasticsearch集群地址。格式为ip1:port1,ip2:port2...
- user和password:elasticsearch的认证信息
- scheme:使用https还是http。默认为http模式
- path:REST接口路径,默认为空
- index_name:index名称
- logstash_format:index是否使用logstash命名方式(
logstash-%Y.%m.%d
),默认不启用 - logstash_prefix:logstash_format启用的时候,index命名前缀是什么。默认为
logstash
kafka
把event输出到kafka。
示例配置如下:
1 | <match pattern> |
重要的参数为:
- brokers:Kafka brokers的地址和端口号
- topic_key:record中哪个key对应的值用作Kafka消息的key
- default_topic:如果没有配置topic_key,默认使用的topic名字
- format标签:确定发送的数据格式
- use_event_time:是否使用fluentd event的时间作为Kafka消息的时间。默认为false。意思为使用当前时间作为发送消息的时间
- required_acks:producer acks的值
- compression_codec:压缩编码方式
webhdfs
event通过REST方式写入到HDFS。
HADOOP启用webhdfs的方法
core-site.xml
1 | <configuration> |
hdfs-site.xml
1 | <configuration> |
最后执行$HADOOP_HOME/sbin/httpfs.sh start
命令启动webhdfs支持。
注意:此时webhdfs的端口号为50070。
示例配置和参数
示例配置:
1 | <match access.**> |
注意:需要保证HDFS的目标目录具有写入权限。debug过程发现fluentd请求webhdfs没有使用user proxy,HDFS认为操作的用户为dr.who,无法创建文件。为了解决这个问题,设置HDFS目标目录的权限为777。
重要参数:
- host:namenode的地址
- port:namenode的端口号
- path:写入文件路径。可以使用占位符或者ruby表达式。可以使用如下方式表示时间:
1 | \%Y: year including the century (at least 4 digits) |
输出参数:
- timekey:多久输出一次文件到HDFS。如果path中没有配置占位符,默认为86400(1天)。如果指定了和时间相关的占位符,则文件输出周期自动和最小的时间占位符单位一致
- timekey_wait:允许等待来迟日志的最长时间
- flush_interval:flush间隔时间,默认为不设置
- flush_at_shutdown:关闭的时候是否flush。如果使用内存类型的buffer,需要配置为true
parser配置
regexp
使用正则表达式命名分组的方式从日志(一行或多行)中提取信息。可以通过time_key指定event的time字段的名字。名字为time字段名的分组内容会被抽取为event时间。
一个在线测试正则表达式的工具:http://fluentular.herokuapp.com/
基本配置格式:
1 | <parse> |
正则表达式可以添加额外的参数:
忽略大小写:/.../i
多行匹配:/.../m。注意,此时.
匹配新行
同时使用忽略大小写和多行匹配:/.../im
一个例子,示例配置如下:
1 | <parse> |
如下的数据:
1 | [2013-02-28 12:00:00 +0900] alice engineer 1 |
会被解析为:
1 | time: |
filter配置
record_transformer
record_transformer用来修改event的结构,增加或修改字段。
一个record_transformer的例子:
1 | <filter foo.bar> |
这个filter匹配tag为foo.bar
的source。event增加了两个新的字段:hostname和tag。
其中hostname这里使用了ruby表达式。tag使用了字符串插值。
如果数据为:
1 | {"message":"hello world!"} |
会被转换为:
1 | {"message":"hello world!", "hostname":"db001.internal.example.com", "tag":"foo.bar"} |
可以通过添加enable_ruby配置,在${}
中使用ruby表达式。
例如:
1 | <filter foo.bar> |
如下输入:
1 | {"total":100, "count":10} |
会被转换为:
1 | {"total":100, "count":10, "avg":"10"} |
注意,可以启用auto_typecast true
配置实现自动类型转换。
修改字段的例子:
1 | <filter foo.bar> |
如下输入:
1 | {"message":"hello world!"} |
会被修改为:
1 | {"message":"yay, hello world!"} |
可以在表达式中配置tag_parts变量,引用tag的第n部分。如下所示:
1 | <filter web.*> |
如果遇到tag为web.auth
的数据:
1 | {"user_id":1, "status":"ok"} |
会被转换为:
1 | {"user_id":1, "status":"ok", "service_name":"auth"} |
record标签
record标签的语法为:
1 | <record> |
表达式中可以配置如下变量:
- record:获取record中某些字段的内容。例如
record["count"]
- tag:获取tag的内容
- time:获取日志的时间戳
- hostname:获取主机名字,和
#{Socket.gethostname}
作用一样 - tag_parts[N]:tag以
.
分隔,获取tag的第N部分 - tag_prefix[N]:获取tag的0-N部分
- tag_suffix[N]:获取tag的N-结尾部分
例如tag为debug.my.app
,tag_parts[1]
返回my
。tag_prefix
和tag_suffix
的结果如下:
1 | tag_prefix[0] = debug tag_suffix[0] = debug.my.app |
配置文件使用通配符和扩展
<match>
和<filter>
标签可以使用通配符和扩展。
tag以.
为分隔符,分隔为多个部分。
fluentd支持的通配符和扩展有:*
:只匹配一个部分。比如a.*
匹配a.b
,但是不匹配a
或a.b.c
。**
:匹配0个或多个部分。比如a.**
匹配a
,a.b
和a.b.c
。{X,Y,Z}
:匹配X或Y或Z。#{expression}
:使用嵌入的ruby表达式。有一些快捷变量可以直接使用,例如#{hostname}
和#{worker_id}
。${..}
:使用变量值,tag,record
可以使用如下的方式指定默认值。例如:#{ENV["FOOBAR"] || use_default}
。如果FOOBAR环境变量不存在,则使用use_default
这个值。
注意:match标签的匹配过程是有顺序的。比如说下面的例子:
1 | <match **> |
因为上面的match总是能被匹配到,下面的match永远没有机会执行。
Buffer
buffer为fluentd很关键的配置,意为缓冲区。可以决定收集的数据存入什么介质,多长时间输出一次等。
buffer标签必须配置在match标签内(即在输出端配置)。
buffer具有一个@type属性,用来配置buffer的储存介质:
1 | <buffer> |
@type有两个值:
- file:存入文件
- memory:存入内存,这个是默认值
buffer标签后面可以跟随chunk keys,用来决定buffer以record的什么字段来分段存放。例如:
1 | <buffer ARGUMENT_CHUNK_KEYS> |
注意:
- 可以指定多个buffer chunk keys,使用逗号分隔。
- 如果没有配置chunk key,所有的event都会写入同一个chunk file,直到buffer滚动。
buffer如果使用time作为chunk key,可以按照时间对buffer进行分段。其中:
- timekey:时间的跨度
- timekey_wait:flush延迟时间,用于等待迟到的数据
官网的例子如下:
1 | <match tag.**> |
部分经常用到的配置参数:
- timekey_use_utc:使用国际标准时间还是当地时间,默认是使用当地时间。
- timekey_zone:指定时区。
- chunk_limit_size:chunk大小限制,默认8MB。
- chunk_limit_records:chunk event条数限制。
- total_limit_size:总buffer大小限制。
- chunk_full_threshold:chunk大小超过chunk_limit_size * chunk_full_threshold时会自动flush。
- queued_chunks_limit_size:限制队列中的chunk数目,防止频繁flush产生过多的chunk。
- compress:压缩格式,可使用text或gzip。默认为text。
- flush_at_shutdown:关闭时候是否flush。对于非持久化buffer默认值为true,持久化buffer默认值为false。
- flush_interval:多长时间flush一次。
- retry_timeout:重试flush的超时时间。在这个时间后不再会retry。
- retry_forever:是否永远尝试flush。如果设置为true会忽略retry_timeout的配置。
- retry_max_times:重试最大次数。
- retry_type:有两个配置值:retry时间间隔,指数级增长或者是固定周期重试。
- retry_wait:每次重试等待时间。
- retry_exponential_backoff_base:retry时间指数扩大倍数。
- retry_max_interval:最长retry时间间隔。
- retry_randomize:是否随机retry时间间隔。
配置文件重用
可以通过@include 配置文件路径
方式,引用其他配置文件片段到fluentd主配置文件中。
配置文件路径可以使用绝对路径或相对路径。相对路径的基准路径为fluentd主配置文件所在的路径。
@include
可以出现在主配置文件的任何位置。
Docker日志输出到fluentd
通过配置fluentd logging driver的方式实现。
该driver发送的log信息包含:
字段 | 描述 |
---|---|
container_id | 64字符的container id |
container_name | container名字 |
source | stdout或stderr |
log | container的log |
全局配置方式
修改/etc/docker/daemon.json
,增加如下内容:
1 | { |
然后重启docker daemon使配置生效。
也可以通过添加--log-driver
和--log-opt
参数的方式指定某个container使用fluentd logging driver。如下所示:
1 | docker run --log-driver=fluentd --log-opt fluentd-address=fluentdhost:24224 |
可以通过在--log-opt
后指定tag的方式,确定source的tag。
Docker官网参考链接:https://docs.docker.com/config/containers/logging/fluentd/
配置实例
实例1
采集/root/my.txt
文件(内容格式为key value),并发送到http://localhost:9090/
。
fluentd的配置文件如下:
1 | <source> |
实例2
提取用户操作记录,打印到fluentd日志。
1 | <source> |
实例3
收集用户操作记录转发到另一个fluentd节点,同时将数据发送到Kafka和存入HDFS。
数据流为:fluentd采集端 -> fluentd收集端 -> kafka和HDFS
示例用户操作记录数据为:
1 | root pts/1 2020-03-26 10:59 (10.180.206.1):root 2020-03-26 11:00:09 130 tail -f /var/log/command.his.log |
采集节点的配置:
1 | <source> |
fluentd收集节点的配置:
1 | <source> |