Blame view

RabbitMQ_sub/src/messageReader.py 2.94 KB
4d8ee1534   Thanasis Naskos   adding rabbitmq c...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
  __author__ = "Jesus Villalobos, Ruben Trapero"
  __copyright__ = "Copyright (C) 2022 ATOS"
  __license__ = "LGPLv3"
  __version__ = "1.0"
  
  import sys
  import pika
  from pika.credentials import ExternalCredentials
  import time
  import ssl
  import os
  
  
  class MessageReader:
  
      def __init__(self):
          self.SERVER_IP = os.getenv('SERVER_IP', '95.168.171.182')
          self.SSL_PORT = os.getenv('SSL_PORT', 8007)
          self.CONSUMER_NAME = os.getenv('CONSUMER_NAME', 'kea')
          self.EXCHANGE_QUEUE_NAME = os.getenv('EXCHANGE_QUEUE_NAME', 'atos.xlsiem.exchange.events')
          self.CONSUMER_QUEUE_NAME = os.getenv('CONSUMER_QUEUE_NAME', 'atos.xlsiem.queue.events') + "." + self.CONSUMER_NAME
          self.CA_CERT = os.getenv('CA_CERT', './ssl_kea/cacert.pem')
          self.CLIENT_CERT = os.getenv('CLIENT_CERT', './ssl_kea/cert.pem')
          self.CLIENT_KEY = os.getenv('CLIENT_KEY', './ssl_kea/key.pem')
  
      def connect(self):
          SSL_OPTIONS = dict(
              ssl_version=ssl.PROTOCOL_TLSv1_2,
              ca_certs=self.CA_CERT,
              keyfile=self.CLIENT_KEY,
              certfile=self.CLIENT_CERT,
              cert_reqs=ssl.CERT_REQUIRED)
          credentials = ExternalCredentials()
          self.connection = pika.BlockingConnection(
              pika.ConnectionParameters(host=self.SERVER_IP, port=self.SSL_PORT, ssl=True, ssl_options=SSL_OPTIONS,
                                        socket_timeout=1,
                                        credentials=credentials))
          self.channel = self.connection.channel()
          # Creates a Queue
          self.channel.queue_declare(queue=self.CONSUMER_QUEUE_NAME, durable=True, auto_delete=False)
          # Binds the created queue to the exchange, so every message received in the exchange will be copied into our
          # queue
          self.channel.queue_bind(self.CONSUMER_QUEUE_NAME, self.EXCHANGE_QUEUE_NAME, routing_key=None, arguments=None)
  
      def read(self, callback):
  
          while True:
              try:
                  # Configure the channel to consume messages from the queue:
                  consumer_tag = self.channel.basic_consume(callback, queue=self.CONSUMER_QUEUE_NAME, no_ack=False,
                                                            exclusive=False)
                  print(' [*] Waiting for messages. To exit press CTRL+C')
                  self.channel.start_consuming()
                  self.connection.close()
              except pika.exceptions.ConnectionClosed:
                  print("Connection error. Trying again...")
                  time.sleep(5)
              except KeyboardInterrupt:
                  print('Program finished from keyboard')
                  sys.exit(0)
              except:
                  print("Unexpected error:", str(sys.exc_info()))
                  sys.exit(1)
  
      def clean(self):
          print('Deleting consumer queue: ' + self.CONSUMER_QUEUE_NAME)
          #self.channel.queue_delete(queue=self.CONSUMER_QUEUE_NAME)