#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: chribell
This script is used to create a simple CAPEC database as an SQLite file.
Arguments
---------
1. Directory path of CAPEC sqlite db
2. Path of ouput sqlite file
Example
---------
./create_capec_db.py /directory_of_xml output_capec_db
"""
from sys import argv
from os import path
from sqlitedict import SqliteDict
import xml.etree.ElementTree as ET
def read_capec(capec_file):
"""
Parameters
----------
capec_file : str
The capec.xml file path.
Returns
-------
capecs : dict
A CAPEC dictionary.
"""
tree = ET.parse(capec_file)
root = tree.getroot()
namespace = '{http://capec.mitre.org/capec-3}'
attack_patterns = root[0]
capecs = {}
for ap in attack_patterns:
_id = ap.attrib['ID']
_name = ap.attrib['Name']
_description = ap.findall(namespace + 'Description')[0].text
cwes = []
relationships = {}
for relation in ap.findall(namespace + 'Related_Attack_Patterns/' + namespace + 'Related_Attack_Pattern'):
relationships[relation.attrib['CAPEC_ID']] = relation.attrib['Nature']
for cwe in ap.findall(namespace + 'Related_Weaknesses/' + namespace + 'Related_Weakness'):
cwes.append(cwe.attrib['CWE_ID'])
parents = [k for k, v in relationships.items() if v == 'ChildOf']
capecs[_id] = {
'id': _id,
'name': _name,
'description': _description,
'cwe': cwes,
'parents': parents,
'relationships': relationships,
}
return capecs
def read_cwe(cwe_file):
"""
Parameters
----------
cwe_file : str
The cwe.xml file path.
Returns
-------
cwes : dict
A CWE dictionary.
"""
tree = ET.parse(cwe_file)
root = tree.getroot()
namespace = '{http://cwe.mitre.org/cwe-6}'
weaknesses = root[0]
cwes = {}
for w in weaknesses:
_id = w.attrib['ID']
_name = w.attrib['Name']
_description = w.findall(namespace + 'Description')[0].text
cves = []
relationships = {}
for relation in w.findall(namespace + 'Related_Weaknesses/' + namespace + 'Related_Weakness'):
relationships[relation.attrib['CWE_ID']] = relation.attrib['Nature']
for cwe in w.findall(namespace + 'Observed_Examples/' + namespace + 'Observed_Example/' + namespace + 'Reference'):
cves.append(cwe.text)
parents = [k for k, v in relationships.items() if v == 'ChildOf']
cwes[_id] = {
'id': _id,
'name': _name,
'description': _description,
'cve': cves,
'parents': parents,
'relationships': relationships
}
return cwes
def read_cve(cve_file):
"""
Parameters
----------
cve_file : str
The cve.xml file path.
Returns
-------
cves : dict
A CVE dictionary.
"""
tree = ET.parse(cve_file)
root = tree.getroot()
namespace = '{http://cve.mitre.org/cve/downloads/1.0}'
cves = {}
for v in root:
_id = v.attrib['name']
cves[_id] = {
'id': _id,
'description': v.findall(namespace + 'desc')[0].text,
'type': v.attrib['type']
}
return cves
def add_children_graph(db):
"""
Parameters
----------
db : dict
Returns
-------
db : dict
Mutates the input dictionary with a map of parent-child hierarchy,
i.e. with a directed graph.
"""
graph = {}
for i in db:
for r in db[i]['relationships']:
if db[i]['relationships'][r] == 'ChildOf':
graph[r] = graph.get(r, [])
graph[r].append(i)
for i in db:
db[i]['children'] = graph[i] if i in graph else []
return db
def map_many_to_many(d, field):
"""
Parameters
----------
d : dict
The input dictionary.
field : str
The filtering key.
Returns
-------
m : dict
A filtered dictionary composed only with non empty lists.
m_inv : dict
The inverted dictionary of m.
"""
m = {k: v[field] for k, v in d.items() if len(v[field]) > 0}
m_inv = {}
for k, v in m.items():
for i in v:
m_inv[i] = m_inv.get(i, [])
m_inv[i].append(k)
return m, m_inv
def start(input_dir, db_path):
# read mapping
capec = read_capec(input_dir + '/capec.xml')
capec = add_children_graph(capec)
cwe = read_cwe(input_dir + '/cwe.xml')
cwe = add_children_graph(cwe)
# cve = read_cve(input_dir + '/cve.xml')
# read suricata sid to cve mapping
sid_cve = {}
with open(input_dir +'/sid-cve', 'r') as f:
for line in f:
tmp = line.split(' ')
sid_cve[tmp[0].strip(' "\'\t\r\n')] = tmp[1].strip(' "\'\t\r\n')
cwe_cve, cve_cwe = map_many_to_many(cwe, 'cve')
capec_cwe, cwe_capec = map_many_to_many(capec, 'cwe')
capecdb = SqliteDict(db_path, autocommit=True)
capecdb['capec'] = capec
capecdb['cwe'] = cwe
# capecdb['cve'] = cve
capecdb['sid-cve'] = sid_cve
capecdb['cve-cwe'] = cve_cwe
capecdb['cwe-cve'] = cwe_cve
capecdb['cwe-capec'] = cwe_capec
capecdb['capec-cwe'] = capec_cwe
capecdb.close()
if len(argv) - 1 == 2 and path.exists(argv[1]):
start(argv[1], argv[2])
else:
print('Error, please provide the paths of eve.json and capecdb')