Blame view
CEPTD/docker/suricata/dist/process_suricata_log.py
3.78 KB
0d8c0f816 initial commit |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @author: chribell The purpose of this script is to process Suricata's output json file (eve.json) and find possible connections between Suricata rules -> CVEs -> CWEs -> CAPECs. Arguments --------- 1. Path of CAPEC sqlite db 2. Path of eve.json 3. Path of output json file Example --------- ./process_suricata_log.py ./capecdb.sqlite /var/log/suricata/eve.json /var/log/suricata/capec.json """ import time from sys import argv from os import SEEK_END, path, linesep import json from sqlitedict import SqliteDict def suricata_event(line): src = json.loads(line) evt = {} evt['timestamp'] = src['timestamp'] # extract these fields from the suricata alert alert_fields = ('signature_id', 'signature', 'category', 'severity') if 'alert' in src: for field in alert_fields: evt[field] = src['alert'][field] return evt def tail(file): file.seek(0, SEEK_END) while True: line = file.readline() if not line: time.sleep(0.1) continue # using a generator is preferred, just to consume every new line yield line def extract_capec_paths(db, capec): paths = {} for c in db['capec'][capec]['parents']: paths['capec-' + c] = { 'name': db['capec'][c]['name'], **extract_capec_paths(db, c) } return paths def extract_cwe_paths(db, cwe): paths = {} if cwe in db['cwe-capec']: # CWE has a related CAPEC for c in db['cwe-capec'][cwe]: paths['capec-' + c] = { 'name': db['capec'][c]['name'], **extract_capec_paths(db, c) } for w in db['cwe'][cwe]['parents']: paths['cwe-' + w] = { 'name': db['cwe'][w]['name'], **extract_cwe_paths(db, w) } return paths def extract_paths(db, cve): paths = {} # check if is CVE is related to any CWE if cve not in db['cve-cwe']: return {} cwe = db['cve-cwe'][cve] for w in cwe: paths['cwe-' + w] = { 'name': db['cwe'][w]['name'], **extract_cwe_paths(db, w) } return paths def format_paths(key, value): if not bool(value): return [ [key] ] # empty dictionary, return key only paths = [] for i in value.keys(): for p in format_paths(i, value[i]): paths.append([key] + p) return paths def write_to_file(output_path, evt): with open(output_path,'a', encoding='utf-8') as out: json.dump(evt, out, ensure_ascii=False) out.write(linesep) def start(db_path, log_path, output_path): db = SqliteDict(db_path) # open db log_file = open(log_path) # open log file log_lines = tail(log_file) # calling the generator, watching for new lines for line in log_lines: evt = suricata_event(line) # suricara rule 2200007 is used for padding, so we ignore it if 'signature_id' in evt and evt['signature_id'] != 2200007: sid = str(evt['signature_id']) if sid in db['sid-cve']: cve = db['sid-cve'][sid] evt['tree'] = extract_paths(db, cve) # paths = [] # for i in tmp: # paths = paths + [ [i] ] # paths = paths + format_paths(i, tmp[i]) # evt['paths'] = {} # for i in range(len(paths)): # evt['paths'][i] = paths[i] evt['cve'] = cve write_to_file(output_path, evt) if len(argv) - 1 == 3 and path.exists(argv[1]) and path.exists(argv[2]): start(argv[1], argv[2], argv[3]) else: print('Error, please provide the paths of capecdb.sqlite, eve.json and output') |