Page MenuHomeDevCentral

No OneTemporary

diff --git a/notifications b/notifications
index d733490..1adca28 100755
--- a/notifications
+++ b/notifications
@@ -1,152 +1,185 @@
#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Description: Connects to the message broker, subscribes to
# the notifications exchange, consumes messages,
# prints them on the console
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
+
import configparser
import json
import pika
import sys
import time
+
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def getConfig ():
+
+def getConfig():
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
+
def getCredentials(config):
return pika.PlainCredentials(
username=config['Broker']['User'],
password=config['Broker']['Password'],
erase_on_connect=True
)
+
def getBrokerConnection(config):
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
virtual_host=config['Broker']['Vhost'],
credentials=getCredentials(config)
)
return pika.BlockingConnection(parameters)
+
def getExchange(config):
return config['Broker']['Exchange']
+
def getBrokerQueue(channel, exchange):
channel.exchange_declare(exchange=exchange, type='topic')
result = channel.queue_declare(exclusive=True)
return result.method.queue
+
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def getNotificationFormat():
return "[{time}] <{project}/{group}> {text}"
+
def getNotificationText(notification):
text = notification['text']
if notification['link']:
text += " — " + notification['link']
return text
+
def formatNotification(notificationMessage):
notification = json.loads(notificationMessage)
return getNotificationFormat().format(
time=time.strftime("%H:%M:%S"),
project=notification['project'],
group=notification['group'],
text=getNotificationText(notification)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def onBrokerMessage(channel, basic_deliver, properties, body):
notification = formatNotification(body.decode("utf-8"))
print(notification)
channel.basic_ack(basic_deliver.delivery_tag)
+
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def getConnection (config):
+
+def getConnection(config):
try:
return getBrokerConnection(config)
except pika.exceptions.ProbableAccessDeniedError:
- print("Can't login to the broker: it's probably an access denied case.", file=sys.stderr)
+ print(
+ "Can't login to the broker: it's probably an access denied case.",
+ file=sys.stderr
+ )
sys.exit(2)
-def getChannel (config):
+
+def getChannel(config):
return getConnection(config).channel()
-def getQueue (channel, exchange):
+
+def getQueue(channel, exchange):
try:
return getBrokerQueue(channel, exchange)
except pika.exceptions.ChannelClosed as e:
print("Channel error: {0}".format(e))
sys.exit(4)
+
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def initializeApplication ():
+
+def initializeApplication():
app = {}
app['config'] = getConfig()
app['options'] = {}
app['channel'] = getChannel(app['config'])
app['exchange'] = getExchange(app['config'])
app['queue'] = getQueue(app['channel'], app['exchange'])
-
+
return app
+
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def parseArguments (options):
- #Todo: allows an option --routing-key <key>
+
+def parseArguments(options):
+ # Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
-def subscribeToNotifications (options, channel, exchange, queue):
+
+def subscribeToNotifications(options, channel, exchange, queue):
for binding_key in options['BindingKeys']:
subscribeToNotificationsForTopic(binding_key, channel, exchange, queue)
+
def subscribeToNotificationsForTopic(binding_key, channel, exchange, queue):
try:
channel.queue_bind(exchange=exchange,
queue=queue,
routing_key=binding_key)
except pika.exceptions.ChannelClosed as e:
print("Channel error: {0}".format(e))
sys.exit(8)
-def consumeNotifications (channel, queue):
+
+def consumeNotifications(channel, queue):
channel.basic_consume(onBrokerMessage, queue=queue)
channel.start_consuming()
+
# -------------------------------------------------------------
# Procedural code
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-app = initializeApplication();
-parseArguments(app['options']);
-subscribeToNotifications(app['options'], app['channel'], app['exchange'], app['queue'])
+
+app = initializeApplication()
+parseArguments(app['options'])
+subscribeToNotifications(
+ app['options'],
+ app['channel'],
+ app['exchange'],
+ app['queue']
+)
consumeNotifications(app['channel'], app['queue'])

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jan 28, 09:12 (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2380239
Default Alt Text
(6 KB)

Event Timeline