Home
DevCentral
Search
Configure Global Search
Log In
Transactions
P167
Change Details
Change Details
Old
New
Diff
#!/usr/bin/env python3 # ------------------------------------------------------------- # Notifications center - CLI client # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # Author: Sébastien Santoro aka Dereckson # Project: Nasqueron # Created: 2016-01-27 # Description: Connects to the message broker, subscribes to # the notifications exchange, consumes messages, # prints them on the console # Dependencies: Pika, direct access to the broker # ------------------------------------------------------------- import configparser import json import pika import sys import time # ------------------------------------------------------------- # Helper functions to parse config and connect to the broker # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def getConfig (): config = configparser.ConfigParser() config.read('/usr/local/etc/notifications.conf') return config def getCredentials(config): return pika.PlainCredentials( username=config['Broker']['User'], password=config['Broker']['Password'], erase_on_connect=True ) def getBrokerConnection(config): parameters = pika.ConnectionParameters( host=config['Broker']['Host'], virtual_host=config['Broker']['Vhost'], credentials=getCredentials(config) ) return pika.BlockingConnection(parameters) def getExchange(config): return config['Broker']['Exchange'] def getBrokerQueue(channel, exchange): channel.exchange_declare(exchange=exchange, type='topic') result = channel.queue_declare(exclusive=True) return result.method.queue # ------------------------------------------------------------- # Helper functions to format the output # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def getNotificationFormat(): return "[{time}] <{project}/{group}> {text}" def getNotificationText(notification): text = notification['text'] if notification['link']: text += " — " + notification['link'] return text def formatNotification(notificationMessage): notification = json.loads(notificationMessage) return getNotificationFormat().format( time=time.strftime("%H:%M:%S"), project=notification['project'], group=notification['group'], text=getNotificationText(notification) ) # ------------------------------------------------------------- # Callbacks # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def onBrokerMessage(channel, basic_deliver, properties, body): notification = formatNotification(body.decode("utf-8")) print(notification) channel.basic_ack(basic_deliver.delivery_tag) # ------------------------------------------------------------- # Services providers # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def getConnection (config): try: return getBrokerConnection(config) except pika.exceptions.ProbableAccessDeniedError: print("Can't login to the broker: it's probably an access denied case.", file=sys.stderr) sys.exit(2) def getChannel (config): return getConnection(config).channel() def getQueue (channel, exchange): try: return getBrokerQueue(channel, exchange) except pika.exceptions.ChannelClosed as e: print("Channel error: {0}".format(e)) sys.exit(4) # ------------------------------------------------------------- # Services container # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def initializeApplication (): app = {} app['config'] = getConfig() app['options'] = {} app['channel'] = getChannel(app['config']) app['exchange'] = getExchange(app['config']) app['queue'] = getQueue(app['channel'], app['exchange']) return app # ------------------------------------------------------------- # Main tasks # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def parseArguments (options): #Todo: allows an option --routing-key <key> options['BindingKeys'] = ['#'] def subscribeToNotifications (options, channel, exchange, queue): for binding_key in options['BindingKeys']: subscribeToNotificationsForTopic(binding_key, channel, exchange, queue) def subscribeToNotificationsForTopic(binding_key, channel, exchange, queue): try: channel.queue_bind(exchange=exchange, queue=queue, routing_key=binding_key) except pika.exceptions.ChannelClosed as e: print("Channel error: {0}".format(e)) sys.exit(8) def consumeNotifications (channel, queue): channel.basic_consume(onBrokerMessage, queue=queue) channel.start_consuming() # ------------------------------------------------------------- # Procedural code # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - app = initializeApplication(); parseArguments(app['options']); subscribeToNotifications(app['options'], app['channel'], app['exchange'], app['queue']) consumeNotifications(app['channel'], app['queue'])
#!/usr/bin/env python3 # ------------------------------------------------------------- # Notifications center - CLI client # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # Author: Sébastien Santoro aka Dereckson # Project: Nasqueron # Created: 2016-01-27 # Description: Connects to the message broker, subscribes to # the notifications exchange, consumes messages, # prints them on the console # Dependencies: Pika, direct access to the broker # ------------------------------------------------------------- import configparser import json import pika import sys import time # ------------------------------------------------------------- # Helper functions to parse config and connect to the broker # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def getConfig (): config = configparser.ConfigParser() config.read('/usr/local/etc/notifications.conf') return config def getCredentials(config): return pika.PlainCredentials( username=config['Broker']['User'], password=config['Broker']['Password'], erase_on_connect=True ) def getBrokerConnection(config): parameters = pika.ConnectionParameters( host=config['Broker']['Host'], virtual_host=config['Broker']['Vhost'], credentials=getCredentials(config) ) return pika.BlockingConnection(parameters) def getExchange(config): return config['Broker']['Exchange'] def getBrokerQueue(channel, exchange): channel.exchange_declare(exchange=exchange, type='topic') result = channel.queue_declare(exclusive=True) return result.method.queue # ------------------------------------------------------------- # Helper functions to format the output # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def getNotificationFormat(): return "[{time}] <{project}/{group}> {text}" def getNotificationText(notification): text = notification['text'] if notification['link']: text += " — " + notification['link'] return text def formatNotification(notificationMessage): notification = json.loads(notificationMessage) return getNotificationFormat().format( time=time.strftime("%H:%M:%S"), project=notification['project'], group=notification['group'], text=getNotificationText(notification) ) # ------------------------------------------------------------- # Callbacks # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def onBrokerMessage(channel, basic_deliver, properties, body): notification = formatNotification(body.decode("utf-8")) print(notification) channel.basic_ack(basic_deliver.delivery_tag) # ------------------------------------------------------------- # Services providers # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def getConnection (config): try: return getBrokerConnection(config) except pika.exceptions.ProbableAccessDeniedError: print("Can't login to the broker: it's probably an access denied case.", file=sys.stderr) sys.exit(2) def getChannel (config): return getConnection(config).channel() def getQueue (channel, exchange): try: return getBrokerQueue(channel, exchange) except pika.exceptions.ChannelClosed as e: print("Channel error: {0}".format(e)) sys.exit(4) # ------------------------------------------------------------- # Services container # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def initializeApplication (): app = {} app['config'] = getConfig() app['options'] = {} app['channel'] = getChannel(app['config']) app['exchange'] = getExchange(app['config']) app['queue'] = getQueue(app['channel'], app['exchange']) return app # ------------------------------------------------------------- # Main tasks # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def parseArguments (options): #Todo: allows an option --routing-key <key> options['BindingKeys'] = ['#'] def subscribeToNotifications (options, channel, exchange, queue): for binding_key in options['BindingKeys']: subscribeToNotificationsForTopic(binding_key, channel, exchange, queue) def subscribeToNotificationsForTopic(binding_key, channel, exchange, queue): try: channel.queue_bind(exchange=exchange, queue=queue, routing_key=binding_key) except pika.exceptions.ChannelClosed as e: print("Channel error: {0}".format(e)) sys.exit(8) def consumeNotifications (channel, queue): channel.basic_consume(onBrokerMessage, queue=queue) channel.start_consuming() # ------------------------------------------------------------- # Procedural code # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - app = initializeApplication(); parseArguments(app['options']); subscribeToNotifications(app['options'], app['channel'], app['exchange'], app['queue']) consumeNotifications(app['channel'], app['queue'])
Continue