u Input section
input {
# Suricata
file {
path => ["/data/eve.json"]
codec => json
type => "Suricata"
}
file {
path => ["/data/capec.json"]
codec => json
type => "CAPEC"
}
}
# Filter Section
filter {
# Suricata
if [type] == "Suricata" {
date {
match => [ "timestamp", "ISO8601" ]
}
translate {
refresh_interval => 86400
field => "[alert][signature_id]"
destination => "[alert][cve_id]"
dictionary_path => "/etc/listbot/cve.yaml"
# fallback => "-"
}
}
# Drop if parse fails
if "_grokparsefailure" in [tags] { drop {} }
# Add geo coordinates / ASN info / IP rep.
if [src_ip] {
geoip {
cache_size => 10000
source => "src_ip"
database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"
}
geoip {
cache_size => 10000
source => "src_ip"
database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-ASN.mmdb"
}
translate {
refresh_interval => 86400
field => "src_ip"
destination => "ip_rep"
dictionary_path => "/etc/listbot/iprep.yaml"
}
}
# In some rare conditions dest_port, src_port, status are indexed as string, forcing integer for now
if [dest_port] {
mutate {
convert => { "dest_port" => "integer" }
}
}
if [src_port] {
mutate {
convert => { "src_port" => "integer" }
}
}
if [status] {
mutate {
convert => { "status" => "integer" }
}
}
}
# Output section
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
}
}