#mysql_slow.conf

input {
  file {
    type => "mysql-slow"
    path => "/var/log/mysql_slow_log.log"
  
  }
}
  
filter {
if [type] == "mysql-slow" {
#grep {
# ignore '# Time:' as we use the SET timestamp to get the time. grep is actually
# on the deprecation cycle so need to figure out an alternative to this
#    match => {
#        message => [
#            "# Time: "
#        ]
#    }
#    negate => true
#}
  
multiline {
    # anything not starting with # or SET is the actual query being run so roll it up with
    # the previous entries
    pattern => "^#|^SET"
    negate => true
    what => "previous"
}
  
grok {
    match => {
     message => [
       "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOST:client_hostname}|) \[(%{IP:client_ip}|)\]",

       "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*Last_errno: \

     %{NUMBER:last_errno:int} \s*Killed: %{NUMBER:killed:int}",

       "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \

     \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}",

       "# Bytes_sent: %{NUMBER:bytes_sent:int}",
       "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:mysql_query}"
        ]
    }
}
mutate {
         gsub => [ "mysql_query""\n"" " ]
         gsub => [ "mysql_query""  "" " ]
         add_tag => "mutated_mysql_query"
}
multiline {
    pattern => "(# User|# Thread|# Query|# Time|# Bytes)"
    negate => false
    what => "next"
}
date {
    match => [ "timestamp","UNIX" ]
}
mutate {
    remove_field => [ "timestamp" ]
}
}
}
  
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "192.168.0.100:9200"
        index => "mysql_slow_log-%{+YYYY.MM}"
    }
}


Kibana 图形展示