Blame view
RabbitMQ_sub/src/messageReader.py
2.94 KB
4d8ee1534 adding rabbitmq c... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
__author__ = "Jesus Villalobos, Ruben Trapero" __copyright__ = "Copyright (C) 2022 ATOS" __license__ = "LGPLv3" __version__ = "1.0" import sys import pika from pika.credentials import ExternalCredentials import time import ssl import os class MessageReader: def __init__(self): self.SERVER_IP = os.getenv('SERVER_IP', '95.168.171.182') self.SSL_PORT = os.getenv('SSL_PORT', 8007) self.CONSUMER_NAME = os.getenv('CONSUMER_NAME', 'kea') self.EXCHANGE_QUEUE_NAME = os.getenv('EXCHANGE_QUEUE_NAME', 'atos.xlsiem.exchange.events') self.CONSUMER_QUEUE_NAME = os.getenv('CONSUMER_QUEUE_NAME', 'atos.xlsiem.queue.events') + "." + self.CONSUMER_NAME self.CA_CERT = os.getenv('CA_CERT', './ssl_kea/cacert.pem') self.CLIENT_CERT = os.getenv('CLIENT_CERT', './ssl_kea/cert.pem') self.CLIENT_KEY = os.getenv('CLIENT_KEY', './ssl_kea/key.pem') def connect(self): SSL_OPTIONS = dict( ssl_version=ssl.PROTOCOL_TLSv1_2, ca_certs=self.CA_CERT, keyfile=self.CLIENT_KEY, certfile=self.CLIENT_CERT, cert_reqs=ssl.CERT_REQUIRED) credentials = ExternalCredentials() self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.SERVER_IP, port=self.SSL_PORT, ssl=True, ssl_options=SSL_OPTIONS, socket_timeout=1, credentials=credentials)) self.channel = self.connection.channel() # Creates a Queue self.channel.queue_declare(queue=self.CONSUMER_QUEUE_NAME, durable=True, auto_delete=False) # Binds the created queue to the exchange, so every message received in the exchange will be copied into our # queue self.channel.queue_bind(self.CONSUMER_QUEUE_NAME, self.EXCHANGE_QUEUE_NAME, routing_key=None, arguments=None) def read(self, callback): while True: try: # Configure the channel to consume messages from the queue: consumer_tag = self.channel.basic_consume(callback, queue=self.CONSUMER_QUEUE_NAME, no_ack=False, exclusive=False) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming() self.connection.close() except pika.exceptions.ConnectionClosed: print("Connection error. Trying again...") time.sleep(5) except KeyboardInterrupt: print('Program finished from keyboard') sys.exit(0) except: print("Unexpected error:", str(sys.exc_info())) sys.exit(1) def clean(self): print('Deleting consumer queue: ' + self.CONSUMER_QUEUE_NAME) #self.channel.queue_delete(queue=self.CONSUMER_QUEUE_NAME) |