Page MenuHomeDevCentral

No OneTemporary

diff --git a/notifications b/notifications
new file mode 100755
index 0000000..d733490
--- /dev/null
+++ b/notifications
@@ -0,0 +1,152 @@
+#!/usr/bin/env python3
+
+# -------------------------------------------------------------
+# Notifications center - CLI client
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Author: Sébastien Santoro aka Dereckson
+# Project: Nasqueron
+# Created: 2016-01-27
+# Description: Connects to the message broker, subscribes to
+# the notifications exchange, consumes messages,
+# prints them on the console
+# Dependencies: Pika, direct access to the broker
+# -------------------------------------------------------------
+
+import configparser
+import json
+import pika
+import sys
+import time
+
+# -------------------------------------------------------------
+# Helper functions to parse config and connect to the broker
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+def getConfig ():
+ config = configparser.ConfigParser()
+ config.read('/usr/local/etc/notifications.conf')
+ return config
+
+def getCredentials(config):
+ return pika.PlainCredentials(
+ username=config['Broker']['User'],
+ password=config['Broker']['Password'],
+ erase_on_connect=True
+ )
+
+def getBrokerConnection(config):
+ parameters = pika.ConnectionParameters(
+ host=config['Broker']['Host'],
+ virtual_host=config['Broker']['Vhost'],
+ credentials=getCredentials(config)
+ )
+ return pika.BlockingConnection(parameters)
+
+def getExchange(config):
+ return config['Broker']['Exchange']
+
+def getBrokerQueue(channel, exchange):
+ channel.exchange_declare(exchange=exchange, type='topic')
+ result = channel.queue_declare(exclusive=True)
+ return result.method.queue
+
+# -------------------------------------------------------------
+# Helper functions to format the output
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+def getNotificationFormat():
+ return "[{time}] <{project}/{group}> {text}"
+
+def getNotificationText(notification):
+ text = notification['text']
+ if notification['link']:
+ text += " — " + notification['link']
+ return text
+
+def formatNotification(notificationMessage):
+ notification = json.loads(notificationMessage)
+ return getNotificationFormat().format(
+ time=time.strftime("%H:%M:%S"),
+ project=notification['project'],
+ group=notification['group'],
+ text=getNotificationText(notification)
+ )
+
+# -------------------------------------------------------------
+# Callbacks
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+def onBrokerMessage(channel, basic_deliver, properties, body):
+ notification = formatNotification(body.decode("utf-8"))
+ print(notification)
+ channel.basic_ack(basic_deliver.delivery_tag)
+
+# -------------------------------------------------------------
+# Services providers
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+def getConnection (config):
+ try:
+ return getBrokerConnection(config)
+ except pika.exceptions.ProbableAccessDeniedError:
+ print("Can't login to the broker: it's probably an access denied case.", file=sys.stderr)
+ sys.exit(2)
+
+def getChannel (config):
+ return getConnection(config).channel()
+
+def getQueue (channel, exchange):
+ try:
+ return getBrokerQueue(channel, exchange)
+ except pika.exceptions.ChannelClosed as e:
+ print("Channel error: {0}".format(e))
+ sys.exit(4)
+
+# -------------------------------------------------------------
+# Services container
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+def initializeApplication ():
+ app = {}
+
+ app['config'] = getConfig()
+ app['options'] = {}
+ app['channel'] = getChannel(app['config'])
+ app['exchange'] = getExchange(app['config'])
+ app['queue'] = getQueue(app['channel'], app['exchange'])
+
+ return app
+
+# -------------------------------------------------------------
+# Main tasks
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+def parseArguments (options):
+ #Todo: allows an option --routing-key <key>
+ options['BindingKeys'] = ['#']
+
+def subscribeToNotifications (options, channel, exchange, queue):
+ for binding_key in options['BindingKeys']:
+ subscribeToNotificationsForTopic(binding_key, channel, exchange, queue)
+
+def subscribeToNotificationsForTopic(binding_key, channel, exchange, queue):
+ try:
+ channel.queue_bind(exchange=exchange,
+ queue=queue,
+ routing_key=binding_key)
+ except pika.exceptions.ChannelClosed as e:
+ print("Channel error: {0}".format(e))
+ sys.exit(8)
+
+def consumeNotifications (channel, queue):
+ channel.basic_consume(onBrokerMessage, queue=queue)
+ channel.start_consuming()
+
+# -------------------------------------------------------------
+# Procedural code
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+app = initializeApplication();
+parseArguments(app['options']);
+subscribeToNotifications(app['options'], app['channel'], app['exchange'], app['queue'])
+consumeNotifications(app['channel'], app['queue'])

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jan 28, 09:12 (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2380237
Default Alt Text
(5 KB)

Event Timeline