messageReader.py 2.94 KB
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
__author__ = "Jesus Villalobos, Ruben Trapero"
__copyright__ = "Copyright (C) 2022 ATOS"
__license__ = "LGPLv3"
__version__ = "1.0"

import sys
import pika
from pika.credentials import ExternalCredentials
import time
import ssl
import os


class MessageReader:

def __init__(self):
self.SERVER_IP = os.getenv('SERVER_IP', '95.168.171.182')
self.SSL_PORT = os.getenv('SSL_PORT', 8007)
self.CONSUMER_NAME = os.getenv('CONSUMER_NAME', 'kea')
self.EXCHANGE_QUEUE_NAME = os.getenv('EXCHANGE_QUEUE_NAME', 'atos.xlsiem.exchange.events')
self.CONSUMER_QUEUE_NAME = os.getenv('CONSUMER_QUEUE_NAME', 'atos.xlsiem.queue.events') + "." + self.CONSUMER_NAME
self.CA_CERT = os.getenv('CA_CERT', './ssl_kea/cacert.pem')
self.CLIENT_CERT = os.getenv('CLIENT_CERT', './ssl_kea/cert.pem')
self.CLIENT_KEY = os.getenv('CLIENT_KEY', './ssl_kea/key.pem')

def connect(self):
SSL_OPTIONS = dict(
ssl_version=ssl.PROTOCOL_TLSv1_2,
ca_certs=self.CA_CERT,
keyfile=self.CLIENT_KEY,
certfile=self.CLIENT_CERT,
cert_reqs=ssl.CERT_REQUIRED)
credentials = ExternalCredentials()
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.SERVER_IP, port=self.SSL_PORT, ssl=True, ssl_options=SSL_OPTIONS,
socket_timeout=1,
credentials=credentials))
self.channel = self.connection.channel()
# Creates a Queue
self.channel.queue_declare(queue=self.CONSUMER_QUEUE_NAME, durable=True, auto_delete=False)
# Binds the created queue to the exchange, so every message received in the exchange will be copied into our
# queue
self.channel.queue_bind(self.CONSUMER_QUEUE_NAME, self.EXCHANGE_QUEUE_NAME, routing_key=None, arguments=None)

def read(self, callback):

while True:
try:
# Configure the channel to consume messages from the queue:
consumer_tag = self.channel.basic_consume(callback, queue=self.CONSUMER_QUEUE_NAME, no_ack=False,
exclusive=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
self.channel.start_consuming()
self.connection.close()
except pika.exceptions.ConnectionClosed:
print("Connection error. Trying again...")
time.sleep(5)
except KeyboardInterrupt:
print('Program finished from keyboard')
sys.exit(0)
except:
print("Unexpected error:", str(sys.exc_info()))
sys.exit(1)

def clean(self):
print('Deleting consumer queue: ' + self.CONSUMER_QUEUE_NAME)
#self.channel.queue_delete(queue=self.CONSUMER_QUEUE_NAME)