基本规则介绍

  1. tag(<>)包裹的是核心处理模块,有以下几种(前5项较为常用),tag标签内可以添加模式匹配 比如通配符*等
    1. <source>:定义日志数据的来源。这个标签下的配置指定了 Fluentd 如何监听和收集日志数据,比如通过监控文件、接收网络上的日志等。
    2. <parser>:定义日志解析规则,通常在<source><filter>标签中使用,用于指定如何从原始日志数据中解析出结构化信息。
    3. <match>:定义日志数据的最终输出规则。比如写入文件、发送到远程服务器等
    4. <formatter>:定义日志格式化规则,用于指定如何将日志数据转换为特定格式,通常在<match>标签中使用。
    5. <filter>:中间层处理,将其tag标记的日志进行添加、删除字段等处理。
    6. <storage>:用于配置存储插件,这些插件可以被其他插件用来存储状态或数据。
    7. <label>:用于创建日志处理的逻辑分组,可以将多个<match><filter>配置块组织在一个<label>下,以便于管理和复用。
    8. <system>:用于配置 Fluentd 的系统级参数,如日志级别、进程内部缓存大小等。
  2. @标签后通常用于较小的配置块或插件,然后后续跟随的非@参数都是附属与@
  3. @include较特殊,通常用于引入或者复用另一个文件
  4. 通常用@id定义tag对应模块名,用于定位日志问题产生的位置

采集模块

 现在详细介绍<source>下的指令

  1. 第一通常通过@+插件类型来指定输入源,常见有@tail从文本文件最后进行采集,还有@http通过搭建一个http服务来采集
  2. @idtag定义这个模块产生日志名称,用于下一步处理,read_from_head则定义是否第一次加载从头开始
  3. 使用<parse>定义具体处理逻辑,这里对parse进行一些说明:
    • @type xxx定义处理方法,xxx有json(将源数据按照json处理),regexp(使用正则解析,下一行跟expression xxxx),csv格式(下一行keys [具体的列名]逗号隔开),multi_format按照多个模式解析(后面使用多个<pattern>标签依次包裹处理逻辑,如果前面的pattern没匹配上,则依次往后继续), multiline解析多行,ltsv(解析LTSV格式),none不进行任何处理
    • time_format用于如何将时间信息提取出来,通常需要和time_key配合使用,否则默认情况,会按照获取日志的时间作为时间戳,会有磁盘带来的延迟

中间模块

 现在详细介绍<filter>下的指令,<filter>可以类比,我们目前用的最多的是@type kubernetes_metadata 用于添加k8s的源数据,这里保持原样,即:

<filter kubernetes.**>
  @type kubernetes_metadata
  @id filter_kube_metadata
  kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
  verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
  ca_file "#{ENV['KUBERNETES_CA_FILE']}"
  skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
  skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
  skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
  skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
  watch "#{ENV['FLUENT_KUBERNETES_WATCH'] || 'true'}"
</filter>

发送模块

 现在详细介绍<match>下的指令,首先列举常用的几种处理模块的范例:

  1. elasticsearch
<match **>
  @type elasticsearch
  host es.example.com
  port 9200
  index_name fluentd
  type_name _doc
  logstash_format true
</match>
  1. kafka
<match **>
  @type kafka
  brokers kafka.example.com:9092
  default_topic fluentd_logs
  <format>
    @type json
  </format>
</match>
  1. file(写入本地文件)
<match **>
  @type file
  path /var/log/fluentd/logs
  time_slice_format %Y%m%d
  time_slice_wait 10m
  compress gzip
</match>
  1. http(发送到远程服务器)
<match **>
  @type http
  endpoint http://example.com/log
  http_method post
  <format>
    @type json
  </format>
</match>
  1. stdout,直接输出,通常用于调试
<match **>
  @type stdout
</match>
  1. forward 发到另一台fluent实例
<match **>
  @type forward
  <server>
    host fluentd-receiver.example.com
    port 24224
  </server>
</match>
  1. 还有mongodb,s3(amazon S3),gelf(发到graylog服务器)
    其他的一些说明点
  • @log_level 定义发送日志的等级,
  • <match>中通常需要配置<buffer>来定义缓冲区,主要用来减少网络压力,流量控制,以及提高可靠性(网络异常后等待发送),这个bffer的配置给个示例
   <buffer>
     flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
     flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
     chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
     queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
     retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
     retry_forever true
   </buffer>

最终配置

    <source>
      @type tail
      @id in_tail_container_logs
      path "#{ENV['FLUENT_CONTAINER_TAIL_PATH'] || '/var/log/containers/*.log'}"
      pos_file "#{File.join('/var/log/', ENV.fetch('FLUENT_POS_EXTRA_DIR', ''), 'fluentd-containers.log.pos')}"
      tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
      exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
      read_from_head true
      <parse>
        @type regexp
        expression /^(?<time_k8s_record>.+) (?<stream>stdout|stderr) (?<log_flag>[A-Z])[\t\f\v ]+(?<log>.*)$/
        time_format "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TIME_FORMAT'] || '%Y-%m-%dT%H:%M:%S.%N%z'}"
        time_key time_k8s_record
      </parse>
    </source>
    <filter kubernetes.**>
      @type record_modifier
      <record>
        log ${record["log"].gsub(/\e\[[0-9;]*m/, '')}
      </record>
    </filter>
    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
      kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
      verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
      ca_file "#{ENV['KUBERNETES_CA_FILE']}"
      skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
      skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
      skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
      skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
      watch "#{ENV['FLUENT_KUBERNETES_WATCH'] || 'true'}"
    </filter>
    <match **>
      @type elasticsearch
      @id out_es
      @log_level info
      include_tag_key true
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
      ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
      ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1'}"
      user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
      password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
      reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
      reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
      reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
      log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}"
      logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
      logstash_dateformat "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_DATEFORMAT'] || '%Y.%m.%d'}"
      logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'true'}"
      index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
      target_index_key "#{ENV['FLUENT_ELASTICSEARCH_TARGET_INDEX_KEY'] || use_nil}"
      type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
      include_timestamp "#{ENV['FLUENT_ELASTICSEARCH_INCLUDE_TIMESTAMP'] || 'false'}"
      template_name "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_NAME'] || use_nil}"
      template_file "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_FILE'] || use_nil}"
      template_overwrite "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_OVERWRITE'] || use_default}"
      sniffer_class_name "#{ENV['FLUENT_SNIFFER_CLASS_NAME'] || 'Fluent::Plugin::ElasticsearchSimpleSniffer'}"
      request_timeout "#{ENV['FLUENT_ELASTICSEARCH_REQUEST_TIMEOUT'] || '5s'}"
      application_name "#{ENV['FLUENT_ELASTICSEARCH_APPLICATION_NAME'] || use_default}"
      <buffer>
        flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
        flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
        chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
        queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
        retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
        retry_forever true
      </buffer>
    </match>

ps:FLUENT_KUBERNETES_METADATA_SKIP_LABELS标签最好开启,否则会出现一部分pod有label标签但是另一部分没有label,然后存到es的时候如果没有label的先存了,es在接收有label标签的就会报错