# -*- coding: utf-8 -*-
import psycopg2.extras
from datetime import datetime, timedelta
import traceback
import sys
import pandas as pd
import Utils
import logging
import logging.config
import yaml
import os
try:
import cPickle as pickle
except:
import pickle
import PredictionKorvesis as pdm
MYDIR = os.path.dirname(os.path.realpath(__file__))
LOGGING_CONF_FILE = os.path.join(MYDIR, "logging.yml")
def read_log_conf(yaml_file):
with open(yaml_file) as f:
logging.config.dictConfig(yaml.safe_load(f))
read_log_conf(LOGGING_CONF_FILE)
logger = logging.getLogger("mltd-offline")
def connect_time_db(
time_db_host,
time_db_port,
time_db_username,
time_db_password,
time_db_ssl,
time_db_dbname,
):
conn = psycopg2.connect(
dbname=time_db_dbname,
user=time_db_username,
password=time_db_password,
host=time_db_host,
)
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
return cursor
def fetch_training_data(
time_db_cursor,
explicit_security_incidents_dates,
hours_before,
asset_id,
adt_table,
xlsiem_table,
od_table,
):
# fetch reported incidents from ADT table
time_db_cursor.execute(
"SELECT time FROM " + adt_table + " where asset_id = '" + asset_id + "'"
)
major_security_incidents_dates = time_db_cursor.fetchall()
# append to the fetched incident the explicit incidents
if len(explicit_security_incidents_dates) != 0:
for date_str in explicit_security_incidents_dates:
major_security_incidents_dates.append(
[datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ"),"target_incident"]
)
major_security_incidents_dates = sorted(major_security_incidents_dates)
# removing incidents that are hours_before close to each other
remove_candidates = []
for i in range(len(major_security_incidents_dates) - 1, 0, -1):
cur_row = major_security_incidents_dates[i]
pre_row = major_security_incidents_dates[i - 1]
if cur_row[0] <= pre_row[0] + timedelta(hours=Utils.strtime_to_hours(hours_before)):
remove_candidates.append(cur_row)
if len(remove_candidates) > 0:
major_security_incidents_dates.remove(remove_candidates)
dataset_dates = []
dataset_values = []
for msi_date in major_security_incidents_dates:
time_db_cursor.execute(
"SELECT time, event_alarm_id FROM "
+ xlsiem_table
+ " where time > TIMESTAMP '"
+ msi_date[0].strftime("%Y-%m-%dT%H:%M:%SZ")
+ "' - interval '"
+ str(hours_before)
+ " hours' and asset_id = '"
+ asset_id
+ "'"
)
xlsiem_security_incidents = time_db_cursor.fetchall()
od_security_incidents = []
# time_db_cursor.execute(
# "SELECT time, event_alarm_id FROM "
# + od_table
# + " where time > TIMESTAMP '"
# + msi_date[0].strftime("%Y-%m-%dT%H:%M:%SZ")
# + "' - interval '"
# + str(hours_before)
# + " hours' and asset_id = '"
# + asset_id
# + "'"
# )
# od_security_incidents = time_db_cursor.fetchall()
fetched_security_incidents = sorted(xlsiem_security_incidents+od_security_incidents)
if len(fetched_security_incidents) > 0:
for sec_incident in fetched_security_incidents:
dataset_dates.append(sec_incident[0].strftime("%Y-%m-%dT%H:%M:%SZ"))
dataset_values.append(sec_incident[1])
dataset_dates.append(msi_date[0].strftime("%Y-%m-%dT%H:%M:%SZ"))
dataset_values.append("target_event")
return (dataset_dates, dataset_values)
def save_to_bin_file(data, filename):
with open(filename, "ab") as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
def do_the_training(
trainID,
time_db_host,
time_db_port,
time_db_username,
time_db_password,
time_db_ssl,
time_db_dbname,
asset_id,
adt_table,
xlsiem_table,
od_table,
mp_thres_X,
mp_thres_Y,
mp_thres_Z,
pat_length,
s,
midpoint,
hours_before,
time_segments,
explicit_security_incidents_dates,
artificial_events_generation=False,
rre=False,
rfe=False,
kofe=False,
mil_over=False,
fs=False,
):
try:
logger.info("Training process started")
time_db_cursor = connect_time_db(
time_db_host,
time_db_port,
time_db_username,
time_db_password,
time_db_ssl,
time_db_dbname,
)
dataset_dates, dataset_values = fetch_training_data(
time_db_cursor,
explicit_security_incidents_dates,
hours_before,
asset_id,
adt_table,
xlsiem_table,
od_table,
)
target_event_id = "target_event"
dataset = pd.DataFrame(
{"Timestamps": dataset_dates, "Event_id": dataset_values}
)
logger.info("serializing pattern length")
save_to_bin_file(pat_length, "train_" + str(trainID) + ".dat")
logger.info("serializing weak bins")
save_to_bin_file({}, "train_" + str(trainID) + ".dat")
logger.info("serializing matrixProfile")
save_to_bin_file({}, "train_" + str(trainID) + ".dat")
logger.info("serializing measurements")
save_to_bin_file(dataset_values, "train_" + str(trainID) + ".dat")
regr, feature_importance = pdm.train_rf_model(
dataset, time_segments, target_event_id, s, midpoint, rre=rre, rfe=rfe, kofe=kofe, mil_over=mil_over, fs=fs,
) # time_segments='H'
logger.info("serializing random forest trained model")
save_to_bin_file(regr, "train_" + str(trainID) + ".dat")
logger.info("serializing random forest feature importance")
save_to_bin_file(feature_importance, "train_" + str(trainID) + ".dat")
logger.info("serializing flag artificial_events_generation")
save_to_bin_file(artificial_events_generation, "train_" + str(trainID) + ".dat")
logger.info("Training process finished")
time_db_cursor.close()
return 0
except Exception as err:
print(err)
traceback.print_exc(file=sys.stdout)
return 5
# failure_dates = ["2014-05-07T00:01:02Z","2014-07-06T00:01:07Z","2014-10-03T00:01:00Z","2014-12-21T00:00:59Z","2015-02-14T00:00:59Z","2015-04-15T00:01:07Z","2015-07-01T00:00:45Z","2015-08-24T00:00:55Z","2016-04-24T00:00:50Z"]
# do_the_training(2,'localhost',8086,'Axoom3','artificial_events',0.5,1,0.5,6,0.7,2,4,failure_dates)