u Input section input { # Suricata file { path => ["/data/eve.json"] codec => json type => "Suricata" } file { path => ["/data/capec.json"] codec => json type => "CAPEC" } } # Filter Section filter { # Suricata if [type] == "Suricata" { date { match => [ "timestamp", "ISO8601" ] } translate { refresh_interval => 86400 field => "[alert][signature_id]" destination => "[alert][cve_id]" dictionary_path => "/etc/listbot/cve.yaml" # fallback => "-" } } # Drop if parse fails if "_grokparsefailure" in [tags] { drop {} } # Add geo coordinates / ASN info / IP rep. if [src_ip] { geoip { cache_size => 10000 source => "src_ip" database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb" } geoip { cache_size => 10000 source => "src_ip" database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-ASN.mmdb" } translate { refresh_interval => 86400 field => "src_ip" destination => "ip_rep" dictionary_path => "/etc/listbot/iprep.yaml" } } # In some rare conditions dest_port, src_port, status are indexed as string, forcing integer for now if [dest_port] { mutate { convert => { "dest_port" => "integer" } } } if [src_port] { mutate { convert => { "src_port" => "integer" } } } if [status] { mutate { convert => { "status" => "integer" } } } } # Output section output { elasticsearch { hosts => ["elasticsearch:9200"] } }