process_suricata_log.py 3.35 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: chribell

The purpose of this script is to process Suricata's output json file (eve.json)
and find possible connections between Suricata rules -> CVEs -> CWEs -> CAPECs.

Arguments
---------
1. Path of CAPEC sqlite db
2. Path of eve.json
3. Path of output json file


Example
---------
./process_suricata_log.py ./capecdb.sqlite /var/log/suricata/eve.json /var/log/suricata/capec.json
"""


import time
from sys import argv
from os import SEEK_END, path, linesep
import json
from sqlitedict import SqliteDict


def suricata_event(line):
src = json.loads(line)
evt = {}
evt['timestamp'] = src['timestamp']
# extract these fields from the suricata alert
alert_fields = ('signature_id', 'signature', 'category', 'severity')
if 'alert' in src:
for field in alert_fields:
evt[field] = src['alert'][field]
return evt


def tail(file):
file.seek(0, SEEK_END)
while True:
line = file.readline()
if not line:
time.sleep(0.1)
continue
# using a generator is preferred, just to consume every new line
yield line

def extract_capec_paths(db, capec):
paths = {}
for c in db['capec'][capec]['parents']:
paths['capec-' + c] = extract_capec_paths(db, c)
return paths


def extract_cwe_paths(db, cwe):
paths = {}
if cwe in db['cwe-capec']: # CWE has a related CAPEC
for c in db['cwe-capec'][cwe]:
paths['capec-' + c] = extract_capec_paths(db, c)
for w in db['cwe'][cwe]['parents']:
paths['cwe-' + w] = extract_cwe_paths(db, w)
return paths


def extract_paths(db, cve):
paths = {}
# check if is CVE is related to any CWE
if cve not in db['cve-cwe']:
return {}
cwe = db['cve-cwe'][cve]
for w in cwe:
paths['cwe-' + w] = extract_cwe_paths(db, w)
return paths

def format_paths(key, value):
if not bool(value):
return [ [key] ] # empty dictionary, return key only
paths = []
for i in value.keys():
for p in format_paths(i, value[i]):
paths.append([key] + p)
return paths

def write_to_file(output_path, evt, paths):
with open(output_path,'a', encoding='utf-8') as out:
for p in paths:
json.dump({**evt, 'path': p}, out, ensure_ascii=False)
out.write(linesep)

def start(db_path, log_path, output_path):
db = SqliteDict(db_path) # open db
log_file = open(log_path) # open log file
log_lines = tail(log_file) # calling the generator, watching for new lines

for line in log_lines:
evt = suricata_event(line)
# suricara rule 2200007 is used for padding, so we ignore it
if 'signature_id' in evt and evt['signature_id'] != 2200007:
sid = str(evt['signature_id'])
if sid in db['sid-cve']:
cve = db['sid-cve'][sid]

tmp = extract_paths(db, cve)

paths = []
for i in tmp:
paths = paths + [ [i] ]
paths = paths + format_paths(i, tmp[i])

evt['cve'] = cve
write_to_file(output_path, evt, paths)



if len(argv) - 1 == 3 and path.exists(argv[1]) and path.exists(argv[2]):
start(argv[1], argv[2], argv[3])
else:
print('Error, please provide the paths of capecdb.sqlite, eve.json and output')