Logstash 处理日志

Logstash 服务配置

在 docker-compose.yaml 对于 Logstash 服务的配置中:

  logstash:
    build:
      context: logstash/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./logstash/config/logstash.yml
        target: /usr/share/logstash/config/logstash.yml
        read_only: true
      - type: bind
        source: ./logstash/pipeline
        target: /usr/share/logstash/pipeline
        read_only: true
    ports:
      - "9600:9600"
    environment:
      LS_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - elk
    depends_on:
      - elasticsearch
  • 使用 logstash/ 目录下的 Dockerfile 构建镜像
  • 数据卷映射:
    • 将宿主机目录下的配置文件映射到容器内
  • 使用名为 elk 的子网络,这样便可以访问同样使用这个子网络的其他容器

Logstash Pipeline 配置

logstash/pipeline/nginx.conf 配置:

### INPUTS
input {
    kafka  {
      codec => "json"
      topics_pattern => "elk-nginx-log"
      bootstrap_servers => "kafka:9092"
      auto_offset_reset => "latest"
      group_id => "nginx-log"
    }
}

### FILTERS
filter {

    grok {

      #获取 Nginx 日志字段
      match => {
          "message" => [
          #Nginx access log 格式
          # 172.19.0.1 - - [21/May/2020:09:34:14 +0000] "GET /index.html?f=hello HTTP/1.1" 200 5 0.000 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.92 Safari/537.36" "-"
          '%{IPV4:ip} - - \[%{HTTPDATE:[@metadata][timestamp]}\] "%{WORD:method} %{URIPATH:path}(?:%{GREEDYDATA:query}|) HTTP/%{NUMBER}" %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}) (?:%{NUMBER:request_time}) %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for}'
          ]
      }

      remove_field => ["message"]
    }
    
    if [path] {

      # 过滤日志
      if [path] =~ /\.js|css|jpeg|jpg|png|gif|ico|swf|svg$/ {
        drop {}
      }

      #获取 日志 时间
      date {
          match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ]
          remove_field => ["input_type", "offset", "tags", "beat" ]
      }

      mutate {
          #转换数据类型
          convert => [
              "status" , "integer",
              "body_bytes_sent" , "integer",
              "request_time", "float"
          ]
      }

      if [query] {
        kv {
          prefix => "p_"
          source => "query"
          field_split => "&?"

          #只存储感兴趣参数
          #include_keys => [ "uid", "vn" ]
          remove_field => ["query" ]
        }

        #url 解码
        urldecode {
          all_fields => true
        }
      }
    }
}


### OUTPUTS

output {

  elasticsearch {
    hosts => "elasticsearch:9200"
    index => "logstash-%{[fields][topic]}-%{+YYYY.MM.dd.HH}"
  }
}

Logstash pipeline 配置主要分为 3 个部分: input/filter/output

对于 input 模块:

对于 filter 模块:

对于 output 模块:

关于 Grok

Grok 是正则匹配工具,更多正则模式请参考 https://grokdebug.herokuapp.com/patterns#

Grok Debugger