基本规则介绍
- tag(
<>
)包裹的是核心处理模块,有以下几种(前5项较为常用),tag标签内可以添加模式匹配 比如通配符*等<source>
:定义日志数据的来源。这个标签下的配置指定了 Fluentd 如何监听和收集日志数据,比如通过监控文件、接收网络上的日志等。<parser>
:定义日志解析规则,通常在<source>
或<filter>
标签中使用,用于指定如何从原始日志数据中解析出结构化信息。<match>
:定义日志数据的最终输出规则。比如写入文件、发送到远程服务器等<formatter>
:定义日志格式化规则,用于指定如何将日志数据转换为特定格式,通常在<match>
标签中使用。<filter>
:中间层处理,将其tag标记的日志进行添加、删除字段等处理。<storage>
:用于配置存储插件,这些插件可以被其他插件用来存储状态或数据。<label>
:用于创建日志处理的逻辑分组,可以将多个<match>
和<filter>
配置块组织在一个<label>
下,以便于管理和复用。<system>
:用于配置 Fluentd 的系统级参数,如日志级别、进程内部缓存大小等。
@
标签后通常用于较小的配置块或插件,然后后续跟随的非@
参数都是附属与@
的@include
较特殊,通常用于引入或者复用另一个文件- 通常用
@id
定义tag对应模块名,用于定位日志问题产生的位置
采集模块
现在详细介绍<source>
下的指令
- 第一通常通过
@
+插件类型来指定输入源,常见有@tail
从文本文件最后进行采集,还有@http
通过搭建一个http服务来采集 @id
下tag
定义这个模块产生日志名称,用于下一步处理,read_from_head
则定义是否第一次加载从头开始- 使用
<parse>
定义具体处理逻辑,这里对parse进行一些说明:@type xxx
定义处理方法,xxx有json(将源数据按照json处理),regexp(使用正则解析,下一行跟expression xxxx),csv格式(下一行keys [具体的列名]
逗号隔开),multi_format按照多个模式解析(后面使用多个<pattern>
标签依次包裹处理逻辑,如果前面的pattern没匹配上,则依次往后继续), multiline解析多行,ltsv(解析LTSV格式),none不进行任何处理time_format
用于如何将时间信息提取出来,通常需要和time_key
配合使用,否则默认情况,会按照获取日志的时间作为时间戳,会有磁盘带来的延迟
中间模块
现在详细介绍<filter>
下的指令,<filter>
可以类比,我们目前用的最多的是@type kubernetes_metadata
用于添加k8s的源数据,这里保持原样,即:
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
ca_file "#{ENV['KUBERNETES_CA_FILE']}"
skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
watch "#{ENV['FLUENT_KUBERNETES_WATCH'] || 'true'}"
</filter>
发送模块
现在详细介绍<match>
下的指令,首先列举常用的几种处理模块的范例:
- elasticsearch
<match **>
@type elasticsearch
host es.example.com
port 9200
index_name fluentd
type_name _doc
logstash_format true
</match>
- kafka
<match **>
@type kafka
brokers kafka.example.com:9092
default_topic fluentd_logs
<format>
@type json
</format>
</match>
- file(写入本地文件)
<match **>
@type file
path /var/log/fluentd/logs
time_slice_format %Y%m%d
time_slice_wait 10m
compress gzip
</match>
- http(发送到远程服务器)
<match **>
@type http
endpoint http://example.com/log
http_method post
<format>
@type json
</format>
</match>
- stdout,直接输出,通常用于调试
<match **>
@type stdout
</match>
- forward 发到另一台fluent实例
<match **>
@type forward
<server>
host fluentd-receiver.example.com
port 24224
</server>
</match>
- 还有
mongodb
,s3
(amazon S3),gelf
(发到graylog服务器)
其他的一些说明点
@log_level
定义发送日志的等级,<match>
中通常需要配置<buffer>
来定义缓冲区,主要用来减少网络压力,流量控制,以及提高可靠性(网络异常后等待发送),这个bffer的配置给个示例
<buffer>
flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
retry_forever true
</buffer>
最终配置
<source>
@type tail
@id in_tail_container_logs
path "#{ENV['FLUENT_CONTAINER_TAIL_PATH'] || '/var/log/containers/*.log'}"
pos_file "#{File.join('/var/log/', ENV.fetch('FLUENT_POS_EXTRA_DIR', ''), 'fluentd-containers.log.pos')}"
tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
read_from_head true
<parse>
@type regexp
expression /^(?<time_k8s_record>.+) (?<stream>stdout|stderr) (?<log_flag>[A-Z])[\t\f\v ]+(?<log>.*)$/
time_format "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TIME_FORMAT'] || '%Y-%m-%dT%H:%M:%S.%N%z'}"
time_key time_k8s_record
</parse>
</source>
<filter kubernetes.**>
@type record_modifier
<record>
log ${record["log"].gsub(/\e\[[0-9;]*m/, '')}
</record>
</filter>
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
ca_file "#{ENV['KUBERNETES_CA_FILE']}"
skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
watch "#{ENV['FLUENT_KUBERNETES_WATCH'] || 'true'}"
</filter>
<match **>
@type elasticsearch
@id out_es
@log_level info
include_tag_key true
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1'}"
user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}"
logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
logstash_dateformat "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_DATEFORMAT'] || '%Y.%m.%d'}"
logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'true'}"
index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
target_index_key "#{ENV['FLUENT_ELASTICSEARCH_TARGET_INDEX_KEY'] || use_nil}"
type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
include_timestamp "#{ENV['FLUENT_ELASTICSEARCH_INCLUDE_TIMESTAMP'] || 'false'}"
template_name "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_NAME'] || use_nil}"
template_file "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_FILE'] || use_nil}"
template_overwrite "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_OVERWRITE'] || use_default}"
sniffer_class_name "#{ENV['FLUENT_SNIFFER_CLASS_NAME'] || 'Fluent::Plugin::ElasticsearchSimpleSniffer'}"
request_timeout "#{ENV['FLUENT_ELASTICSEARCH_REQUEST_TIMEOUT'] || '5s'}"
application_name "#{ENV['FLUENT_ELASTICSEARCH_APPLICATION_NAME'] || use_default}"
<buffer>
flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
retry_forever true
</buffer>
</match>
ps:FLUENT_KUBERNETES_METADATA_SKIP_LABELS
标签最好开启,否则会出现一部分pod有label标签但是另一部分没有label,然后存到es的时候如果没有label的先存了,es在接收有label标签的就会报错