Blame view
MLTD/src/OnlinePrediction.py
10.1 KB
0d8c0f816 initial commit |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# -*- coding: utf-8 -*- import time, json import dateutil import numpy as np import pandas as pd import Utils try: import cPickle as pickle except: import pickle import PredictionKorvesis as pdm import paho.mqtt.client as paho import ReportTimeDB |
2e20dacc7 MLTD implementati... |
17 |
import ReportSyslog |
0d8c0f816 initial commit |
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import logging import logging.config import yaml import os import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) MYDIR = os.path.dirname(os.path.realpath(__file__)) LOGGING_CONF_FILE = os.path.join(MYDIR, "logging.yml") class OnlinePrediction: def __init__(self): self.data_dates = [] self.data_values = [] |
75a30f1df MLTD taking the s... |
34 35 |
self.source_ip = "10.0.2.15" self.target_ip = "10.0.2.15" |
0d8c0f816 initial commit |
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
self.read_log_conf(LOGGING_CONF_FILE) self.logger = logging.getLogger("mltd-online") self.logger.info("Online MLTD is running") def read_log_conf(self, yaml_file): with open(yaml_file) as f: logging.config.dictConfig(yaml.safe_load(f)) def on_connect(self, client, userdata, flags, rc): if rc == 0: self.logger.debug("Connected to broker") self.Connected = True # Signal connection else: self.logger.error("Connection failed") def on_message(self, client, userdata, message): """ { "asset_id": "string", "timestamp": "2020-02-27T13:40:18.224Z", "event_alarm": [ { "event_alarm_id": "string", "event_alarm_char": "string", "name": "string", "source_ip": "string", "source_port": 0, "destination_ip": "string", "destination_port": 0, "priority": 0, |
2e20dacc7 MLTD implementati... |
67 |
"confidence": 0 |
0d8c0f816 initial commit |
68 |
} |
2e20dacc7 MLTD implementati... |
69 |
] |
0d8c0f816 initial commit |
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
} :param client: :param userdata: :param message: :return: """ self.logger.debug( "Event received: " + str(json.loads(message.payload.decode("UTF-8"))) ) json.loads(message.payload) data_dates = [] data_values = [] measDict = json.loads(message.payload.decode("UTF-8")) if measDict["asset_id"] == self.asset_id: for event in range(len(measDict["event_alarm"])): data_dates.append(measDict["timestamp"]) # datetime.datetime.fromtimestamp(measDict["timestamp"]).strftime( # "%Y-%m-%dT%H:%M:%SZ" # ) # ) |
75a30f1df MLTD taking the s... |
91 92 |
self.source_ip = measDict["event_alarm"][event]["source_ip"] self.target_ip = measDict["event_alarm"][event]["destination_ip"] |
0d8c0f816 initial commit |
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
event_alarm_id = measDict["event_alarm"][event]["event_alarm_id"] data_values.append(event_alarm_id) self.do_the_monitoring(data_dates, data_values) def do_the_monitoring(self, data_dates=[], data_values=[]): predictions = [] data_values = self.data_values + data_values data_dates = self.data_dates + data_dates if len(data_dates) > 0: first_date_str = data_dates[0] last_date_str = data_dates[-1] first_date = dateutil.parser.parse(first_date_str) last_date = dateutil.parser.parse(last_date_str) duration = (last_date - first_date).total_seconds() if duration >= self.ts_seconds: self.data_values = [] self.data_dates = [] self.logger.info(f"Events Received: {len(data_values)}" f" - Duration: {round(duration,2)} secs") self.logger.info("Prediction triggered") predictions = self.predict(data_dates, data_values) else: self.data_values = data_values self.data_dates = data_dates if max(predictions) > self.sigmoid_threshold: |
0d8c0f816 initial commit |
122 123 124 125 126 127 128 129 130 |
timeframe = Utils.sigmoid_mins( max(predictions), self.rf_s, Utils.convert_hours_to_mins(Utils.strtime_to_hours(self.rf_midpoint)), self.hours_before, ) self.logger.info(f"A prominent security incident is predicted" f" - Risk level: {round(max(predictions),2)}" f" - Expected timeframe: {round(timeframe,2)} secs") |
2e20dacc7 MLTD implementati... |
131 132 |
ReportSyslog.report( |
75a30f1df MLTD taking the s... |
133 |
self.asset_id, max(predictions) * 100, timeframe, self.source_ip, self.target_ip |
2e20dacc7 MLTD implementati... |
134 |
) |
0d8c0f816 initial commit |
135 |
ReportTimeDB.report( |
850b58c41 Update OD to dete... |
136 |
self.time_db_client, self.asset_id, max(predictions) * 100, timeframe |
0d8c0f816 initial commit |
137 |
) |
75a30f1df MLTD taking the s... |
138 |
self.logger.info(f"The incident was reportered on TimescaleDB and Rsyslog server") |
0d8c0f816 initial commit |
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
else: self.logger.info( f"The predicted risk {round(max(predictions),2)} is " f"below the alarm threshold {round(self.sigmoid_threshold,2)}" ) def predict(self, data_dates=[], data_values=[]): dataset = pd.DataFrame({"Timestamps": data_dates, "Event_id": data_values}) predictions = pdm.predict( self.regr, dataset, self.time_segments, self.feature_importance.index ) self.logger.debug(f"Risk predictions: {predictions}") return predictions def form_dataset(self, dates_list, events_list, feature_importance): # Create a Pandas dataframe with all the non zero event ids # TODO handle differently the zero event ids based on some policy loc = 0 dataset = pd.DataFrame(columns=["Timestamps", "Event_id"]) if len(events_list) != abs(sum(events_list)): for i in range(len(events_list)): if i < len(dates_list) and feature_importance.index.contains( events_list[i] ): dataset.loc[loc] = pd.Series( {"Timestamps": dates_list[i], "Event_id": events_list[i]} ) loc += 1 if not dataset.empty: # dropping ALL duplicate values dataset.drop_duplicates(subset="Timestamps", keep="first", inplace=True) dataset.set_index( pd.to_datetime(dataset["Timestamps"]), drop=False, inplace=True ) self.logger.debug(f"Formed dataest: {dataset}") return dataset def load_data(self, filename): infile = open(filename, "rb") pat_length = pickle.load(infile) weak_bins_mapping = pickle.load(infile) mp = pickle.load(infile) train_dataset_values = np.array(pickle.load(infile)) regr = pickle.load(infile) feature_importance = pickle.load(infile) artificial_events_generation = pickle.load(infile) infile.close() return ( pat_length, weak_bins_mapping, mp, train_dataset_values, regr, feature_importance, artificial_events_generation, ) def start_online_prediction_MQTT( self, trainID, broker_address, port, mqtt_topic, prediction_threshold, report_time_db_host, report_time_db_port, report_time_db_username, report_time_db_password, report_time_db_database, report_time_db_table, report_time_db_ssl, report_asset_id, ): self.sigmoid_threshold = prediction_threshold self.time_db_host = report_time_db_host self.time_db_port = report_time_db_port self.time_db_username = report_time_db_username self.time_db_password = report_time_db_password self.time_db_database = report_time_db_database self.time_db_table = report_time_db_table self.time_db_ssl = report_time_db_ssl self.asset_id = report_asset_id sql_conn = Utils.create_sqlite_connection("pdm.sqlite") self.time_segments = Utils.select_model_attribute( sql_conn, trainID, "time_segments" ) self.rf_s = Utils.select_model_attribute(sql_conn, trainID, "rf_s") self.rf_midpoint = Utils.select_model_attribute( sql_conn, trainID, "rf_midpoint" ) self.hours_before = Utils.select_model_attribute( sql_conn, trainID, "hours_before" ) ts_hours = Utils.strtime_to_hours(self.time_segments) self.ts_seconds = ts_hours * 3600 # load the data from pickle (binary) files - should consider to move to a database solution(?) ( self.pat_length, self.weak_bins_mapping, self.mp, self.dataset_values, self.regr, self.feature_importance, self.artificial_events_generation, ) = self.load_data("train_" + str(trainID) + ".dat") |
850b58c41 Update OD to dete... |
246 247 248 249 |
self.time_db_client = ReportTimeDB.connect(self.time_db_host, self.time_db_port, self.time_db_database, self.time_db_username, self.time_db_password, self.time_db_ssl, ) |
0d8c0f816 initial commit |
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
self.Connected = False client = paho.Client( "Prediction_client" + str(time.time()) ) # create new instance client.on_connect = self.on_connect # attach function to callback client.on_message = self.on_message # attach function to callback client.connect(broker_address, port=port) # connect to broker client.loop_start() # start the loop while self.Connected != True: # Wait for connection time.sleep(0.1) client.subscribe(mqtt_topic) try: while True: time.sleep(1) except KeyboardInterrupt: self.logger.debug("exiting") client.disconnect() client.loop_stop() if __name__ == "__main__": op = OnlinePrediction() op.start_online_prediction_MQTT( 10, "localhost", 1884, "auth/incidents", 0.1, "83.212.116.5", 5432, "postgres", "xs?Z7HsY", "kea", "mltd", False, "server", ) |