Blame view

RabbitMQ_sub/src/xlsiem_rabbitmq_client.py 4.49 KB
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  __author__ = "Jesus Villalobos, Ruben Trapero"
  __copyright__ = "Copyright (C) 2020 ATOS"
  __license__ = "LGPLv3"
  __version__ = "1.0"
  
  import atexit
  import json
  import re
  import base64
  import requests
  import time
  from messageReader import MessageReader
  from multiprocessing import Process
  
  def callback(ch, method, properties, body):
      global first_message
      global unique_events
8c8b3bf08   naskoos   authenticating ke...
18
      global access_token
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
      try:
          event_dict = json.loads(body)
          date = json.loads(event_dict["message"])["event"]["date"]
          src_ip = json.loads(event_dict["message"])["event"]["src_ip"]
          src_port = json.loads(event_dict["message"])["event"]["src_port"]
          dst_ip = json.loads(event_dict["message"])["event"]["dst_ip"]
          dst_port = json.loads(event_dict["message"])["event"]["dst_port"]
          organization = json.loads(event_dict["message"])["event"]["organization"]
          log = json.loads(event_dict["message"])["event"]["log"]
          base64_bytes = log.encode('ascii')
          message_bytes = base64.b64decode(base64_bytes)
          log = message_bytes.decode('ascii')
          classification_match = re.match(r".*\[\*\*\]\s\[Classification:\s(.*)\]\s\[", log)
          if classification_match:
              event = classification_match.group(1)
  
          message = {"asset_id": "server",
              "timestamp": int(date),
              "event_alarm": [
                  {"event_alarm_id": event, "event_alarm_char": event,
                      "name": "xlsiem_event", "source_ip": src_ip,
                      "source_port": int(src_port), "destination_ip": dst_ip,
                      "destination_port": int(dst_port), "priority": 0, "confidence": 0, }], }
  
  
          if len(first_message) == 0:
              first_message = message
              unique_events.add(event)
          else:
8c8b3bf08   naskoos   authenticating ke...
48
              if message["timestamp"]-first_message["timestamp"] < 10:
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
49
50
51
52
                  if event not in unique_events:
                      first_message["event_alarm"] += message["event_alarm"]
                      unique_events.add(event)
              else:
8c8b3bf08   naskoos   authenticating ke...
53
                  print(unique_events)
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
54
                  url = "http://webserver/api/v1/xlsiem"
8c8b3bf08   naskoos   authenticating ke...
55
                  headers = {'Content-Type': 'application/json','Authorization': f'Bearer {access_token}'}
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
56
57
58
59
60
61
62
63
                  payload = json.dumps(first_message)
                  response = requests.request("POST", url, headers=headers, data=payload)
                  print(f"Event sent to KEA: {response}")
                  first_message = {}
                  unique_events = set()
          # ch.basic_ack(delivery_tag=method.delivery_tag)
  
      except Exception as e:
8c8b3bf08   naskoos   authenticating ke...
64
65
66
67
68
69
70
71
72
73
          headers = {'Content-Type': 'application/x-www-form-urlencoded'}
          authBody = {
              "username": "testUser",
              "password": "73T$a2W3ght!8X1721R5",
              "grant_type": "password",
              "client_id": "KEA"
          }
          url = "https://keycloak.fphag.curex-project.eu/auth/realms/master/protocol/openid-connect/token"
          response = requests.request("POST", url, headers=headers, data=authBody)
          access_token = json.loads(response.text)['access_token']
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
          print(e)
  
  
  def clean():
      reader.clean()
  
  def cleaner():
      while True:
          print('cleaner: starting')
          global first_message
          global unique_events
          if len(first_message)>0:
              url = "http://webserver/api/v1/xlsiem"
              headers = {'Content-Type': 'application/json'}
              payload = json.dumps(first_message)
              response = requests.request("POST", url, headers=headers, data=payload)
              print(f"Event sent to KEA: {response}")
              first_message = {}
              unique_events = set()
          print('cleaner: finished')
          time.sleep(60)
  
  unique_events = set()
  first_message = {}
  
8c8b3bf08   naskoos   authenticating ke...
99
100
  #p2 = Process(target=cleaner)
  #p2.start()
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
101
102
103
104
105
106
  
  # time.sleep(30)
  # with open("events.log","r") as f:
  #      for line in f.readlines():
  #          callback(None,None,None,line)
  
8c8b3bf08   naskoos   authenticating ke...
107
108
109
110
111
112
113
114
115
116
117
  headers = {'Content-Type': 'application/x-www-form-urlencoded'}
  authBody = {
      "username": "testUser",
      "password": "73T$a2W3ght!8X1721R5",
      "grant_type": "password",
      "client_id": "KEA"
  }
  url = "https://keycloak.fphag.curex-project.eu/auth/realms/master/protocol/openid-connect/token"
  response = requests.request("POST", url, headers=headers, data=authBody)
  access_token = json.loads(response.text)['access_token']
  
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
118
119
120
121
122
  reader = MessageReader()
  reader.connect()
  reader.read(callback)
  atexit.register(clean)