Filtros para Logstash

En este post os enseñaremos varios ejemplos de filtros para Logstash, son muy útiles para enviar datos a Kibana, no te olvides de añadir el correspondiente prospectors de Filebeats.

Syslog

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

Auth

filter {
  if [type] == "authlog" {
   grok {
      match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
               "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
               "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
               "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} 🙁 %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
               "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
               "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add]}$",
               "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
      pattern_definitions => {
        "GREEDYMULTILINE"=> "(.|\n)*"
      }
      remove_field => "message"
   }
   date {
      match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
   }
   geoip {
      source => "[system][auth][ssh][ip]"
      target => "[system][auth][ssh][geoip]"
   }

  }
}

Accesos Apache

filter {
  if [type] == "apache-access" {
    grok {
      match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?","%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }
      remove_field => "message"
    }
    geoip {
      source => "clientip"
      target => "geoip"
      database => "/etc/logstash/GeoLite2-City.mmdb"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
      add_tag => [ "apache-geoip" ]
    }
    mutate {
      convert => ["response", "integer"]
      convert => ["bytes", "integer"]
      convert => ["responsetime", "float"]
      add_field => [ "src_ip", "%{clientip}" ]
      convert => [ "[geoip][coordinates]", "float"]
      replace => [ "@source_host", "%{host}" ]
      replace => [ "@message", "%{message}" ]
      rename => [ "verb" , "method" ]
    } 
    date {
      match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
      remove_field => [ "timestamp" ]
    }
    useragent {
      source => "agent"
      target => "useragent"
      remove_field => ["agent"]
    }
  }
}

Errores Apache

filter {
  if [type] == "apache-error" {
   grok {
      match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",
        "\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }
      pattern_definitions => {
        "APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
      }
      remove_field => "message"
   }
   mutate {
      rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
   }
   date {
      match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
      remove_field => "[apache2][error][timestamp]"
   }
  }
}

Nginx

filter {
  if [type] == "nginx-access" {
    grok {
      match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
      overwrite => [ "message" ]
      remove_tag => [ "beats_input_codec_plain_applied" ]
    }
    geoip {
      source => "clientip"
      target => "geoip"
      database => "/etc/logstash/GeoLite2-City.mmdb"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
      add_tag => [ "nginx-geoip" ]
    }
    mutate {
      convert => ["response", "integer"]
      convert => ["bytes", "integer"]
      convert => ["responsetime", "float"]
      add_field => [ "src_ip", "%{clientip}" ]
      convert => [ "[geoip][coordinates]", "float"]
      replace => [ "@source_host", "%{host}" ]
      replace => [ "@message", "%{message}" ]
      rename => [ "verb" , "method" ]
    } 
    date {
      match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
      remove_field => [ "timestamp" ]
    }
    useragent {
      source => "agent"
    }
  }
  if [type] == "nginx-error" {
    grok {
      match => [ 
        "(?<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[%{DATA:err_severity}\] (%{NUMBER:pid:int}#%{NUMBER}: \*%{NUMBER}|\*%{NUMBER}) %{DATA:err_message}(?:, client: (?<client_ip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:client_ip})?(?:, referrer: \"%{URI:referrer})?", "%{DATESTAMP:timestamp} \[%{DATA:err_severity}\] %{GREEDYDATA:err_message}"
      ]
      remove_tag => [ "beats_input_codec_plain_applied", "_grokparsefailure" ]
    }
  }
}

MySQL

filter {
  if [type] == "mysql-error" {
   grok {
      match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}",
        "%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}",
        "%{GREEDYDATA:[mysql][error][message2]}"] }
      pattern_definitions => {
        "LOCALDATETIME" => "[0-9]+ %{TIME}"
      }
      remove_field => "message"
   }
   mutate {
      rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
   }
   mutate {
      rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
   }
   date {
      match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
      remove_field => "[apache2][access][time]"
    }
  }
}
filter {
  if [type] == "mysql-slow" {
    if [message] =~ "^# Time:.*$" {
      drop {}
    }
    multiline {
      pattern => "^# User@Host:.*$"
      negate => true
      what => "previous"
    }
    grok {
      match => [
        "message", "(?m)^# User@Host: %{GREEDYDATA:user}\[%{GREEDYDATA}\] @ \[%{IP:client_ip}\]\s*# Query_time: %{NUMBER:query_time:float} Lock_time: %{NUMBER:query_lock_time:float} Rows_sent: %{NUMBER:query_rows_sent:int} Rows_examined: %{NUMBER:query_rows_examined:int}\s*SET timestamp=%{NUMBER:log_timestamp};\s*%{GREEDYDATA:query}$",
        "message", "(?m)^# User@Host: %{GREEDYDATA:user}\[%{GREEDYDATA}\] @ \[%{IP:client_ip}\]\s*# Query_time: %{NUMBER:query_time:float} Lock_time: %{NUMBER:query_lock_time:float} Rows_sent: %{NUMBER:query_rows_sent:int} Rows_examined: %{NUMBER:query_rows_examined:int}\s*use %{GREEDYDATA:database};\s*SET timestamp=%{NUMBER:log_timestamp};\s*%{GREEDYDATA:query}$",
        "message", "(?m)^# User@Host: %{GREEDYDATA:user}\[%{GREEDYDATA}\] @ %{GREEDYDATA:client_ip} \[\]\s*# Query_time: %{NUMBER:query_time:float} Lock_time: %{NUMBER:query_lock_time:float} Rows_sent: %{NUMBER:query_rows_sent:int} Rows_examined: %{NUMBER:query_rows_examined:int}\s*SET timestamp=%{NUMBER:log_timestamp};\s*%{GREEDYDATA:query}$"
      ]
      remove_tag => [ "beats_input_codec_plain_applied", "_grokparsefailure" ]
    }
    date {
      match => ["log_timestamp", "UNIX"]
    }
    mutate {
      remove_field => "log_timestamp"
    }
  }
}

Usar Geoip

Para que pueda geolocalizar IPs necesitamos GeoLite2-City.mmdb, para ello:
# cd /etc/logstash/
# wget http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
# tar -xzvf GeoLite2-City.tar.gz
# mv GeoLite2-City_20*/GeoLite2-City.mmdb ./
# systemctl restart logstash.service

Si no queremos usar Geolocalización comentar las lineas de los filtros:

#    geoip {
#      source => "clientip"
#      target => "geoip"
#      database => "/etc/logstash/GeoLite2-City.mmdb"
#      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
#      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
#      add_tag => [ "nginx-geoip" ]
#    }

Fuente: Bubbl ELK y Elastic

To write a comment on this article, fill out the form below. Fields marked with an asterisk (*) are required.