diff --git a/notifications b/notifications index 6474df6..d0cdfa9 100644 --- a/notifications +++ b/notifications @@ -1,217 +1,218 @@ #!/usr/bin/env python3 # ------------------------------------------------------------- # Notifications center - CLI client # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # Author: Sébastien Santoro aka Dereckson # Project: Nasqueron # Created: 2016-01-27 # Dependencies: Pika, direct access to the broker # ------------------------------------------------------------- """ This module connects to the message broker, subscribes to the notifications exchange, consumes messages, prints them on the console. """ import configparser import json import sys import time import pika # ------------------------------------------------------------- # Helper functions to parse config and connect to the broker # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def get_config(): """Get a parser to read default the configuration file.""" config = configparser.ConfigParser() config.read('/usr/local/etc/notifications.conf') return config def get_credentials(config): """Get credentials to connect to the broker from the configuration.""" return pika.PlainCredentials( username=config['Broker']['User'], password=config['Broker']['Password'], erase_on_connect=True ) def get_broker_connection(config): """Connect to the broker.""" parameters = pika.ConnectionParameters( host=config['Broker']['Host'], virtual_host=config['Broker']['Vhost'], credentials=get_credentials(config) ) return pika.BlockingConnection(parameters) def get_exchange(config): """Get exchange point name from the configuration.""" return config['Broker']['Exchange'] def get_broker_queue(channel, exchange): """Ensure exchange exists and declare a temporary queue.""" channel.exchange_declare( exchange=exchange, exchange_type='topic', durable=True) result = channel.queue_declare(exclusive=True) return result.method.queue # ------------------------------------------------------------- # Helper functions to format the output # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def get_notification_format(): """Get the format to use to print the notification.""" return "[{time}] <{project}/{group}> {text}" def get_notification_text(notification): """Append when needed the notification link to the text return a string.""" text = notification['text'] if notification['link']: text += " — " + notification['link'] return text def format_notification(notification_message): """Format the notification as a string from a JSON message.""" notification = json.loads(notification_message) return get_notification_format().format( time=time.strftime("%b %d %H:%M:%S"), project=notification['project'], group=notification['group'], text=get_notification_text(notification) ) # ------------------------------------------------------------- # Callbacks # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def on_broker_message(channel, basic_deliver, properties, body): """Callback used when a new message have been received from the queue.""" notification = format_notification(body.decode("utf-8")) print(notification) sys.stdout.flush() channel.basic_ack(basic_deliver.delivery_tag) # ------------------------------------------------------------- # Services providers # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def get_connection(config): """Initialize and provide a connection to the broker""" try: return get_broker_connection(config) except pika.exceptions.ProbableAccessDeniedError: print( "Can't login to the broker: it's probably an access denied case.", file=sys.stderr ) sys.exit(2) def get_channel(config): """Initialize and provide a connection channel.""" return get_connection(config).channel() def get_queue(channel, exchange): """Initialize and provide a broker queue for specified exchange.""" try: return get_broker_queue(channel, exchange) except pika.exceptions.ChannelClosed as exception: print("Channel error: {0}".format(exception)) sys.exit(4) # ------------------------------------------------------------- # Services container # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def initialize_application(): """Initialize a container with required services.""" container = {} container['config'] = get_config() container['options'] = {} container['channel'] = get_channel(container['config']) container['exchange'] = get_exchange(container['config']) container['queue'] = get_queue(container['channel'], container['exchange']) return container # ------------------------------------------------------------- # Main tasks # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def parse_arguments(options): """Parse arguments and fill an options array.""" # Todo: allows an option --routing-key options['BindingKeys'] = ['#'] def subscribe_to_notifications(options, channel, exchange, queue): """Subscribe to notifications for specified topics.""" for binding_key in options['BindingKeys']: subscribe_to_topic(binding_key, channel, exchange, queue) def subscribe_to_topic(binding_key, channel, exchange, queue): """Subscribe to notifications for one specified topic.""" try: channel.queue_bind(exchange=exchange, queue=queue, routing_key=binding_key) except pika.exceptions.ChannelClosed as exception: print("Channel error: {0}".format(exception)) sys.exit(8) def consume_notifications(channel, queue): """Consume notifications from a queue and call our callback method when a message is received. """ channel.basic_consume(on_broker_message, queue=queue) channel.start_consuming() # ------------------------------------------------------------- # Application entry point # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def run_application(): """Run the application.""" app = initialize_application() parse_arguments(app['options']) subscribe_to_notifications( app['options'], app['channel'], app['exchange'], app['queue'] ) consume_notifications(app['channel'], app['queue']) -run_application() +if __name__ == "__main__": + run_application()