Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F4060959
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/notifications b/notifications
index d733490..1adca28 100755
--- a/notifications
+++ b/notifications
@@ -1,152 +1,185 @@
#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Description: Connects to the message broker, subscribes to
# the notifications exchange, consumes messages,
# prints them on the console
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
+
import configparser
import json
import pika
import sys
import time
+
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def getConfig ():
+
+def getConfig():
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
+
def getCredentials(config):
return pika.PlainCredentials(
username=config['Broker']['User'],
password=config['Broker']['Password'],
erase_on_connect=True
)
+
def getBrokerConnection(config):
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
virtual_host=config['Broker']['Vhost'],
credentials=getCredentials(config)
)
return pika.BlockingConnection(parameters)
+
def getExchange(config):
return config['Broker']['Exchange']
+
def getBrokerQueue(channel, exchange):
channel.exchange_declare(exchange=exchange, type='topic')
result = channel.queue_declare(exclusive=True)
return result.method.queue
+
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def getNotificationFormat():
return "[{time}] <{project}/{group}> {text}"
+
def getNotificationText(notification):
text = notification['text']
if notification['link']:
text += " — " + notification['link']
return text
+
def formatNotification(notificationMessage):
notification = json.loads(notificationMessage)
return getNotificationFormat().format(
time=time.strftime("%H:%M:%S"),
project=notification['project'],
group=notification['group'],
text=getNotificationText(notification)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def onBrokerMessage(channel, basic_deliver, properties, body):
notification = formatNotification(body.decode("utf-8"))
print(notification)
channel.basic_ack(basic_deliver.delivery_tag)
+
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def getConnection (config):
+
+def getConnection(config):
try:
return getBrokerConnection(config)
except pika.exceptions.ProbableAccessDeniedError:
- print("Can't login to the broker: it's probably an access denied case.", file=sys.stderr)
+ print(
+ "Can't login to the broker: it's probably an access denied case.",
+ file=sys.stderr
+ )
sys.exit(2)
-def getChannel (config):
+
+def getChannel(config):
return getConnection(config).channel()
-def getQueue (channel, exchange):
+
+def getQueue(channel, exchange):
try:
return getBrokerQueue(channel, exchange)
except pika.exceptions.ChannelClosed as e:
print("Channel error: {0}".format(e))
sys.exit(4)
+
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def initializeApplication ():
+
+def initializeApplication():
app = {}
app['config'] = getConfig()
app['options'] = {}
app['channel'] = getChannel(app['config'])
app['exchange'] = getExchange(app['config'])
app['queue'] = getQueue(app['channel'], app['exchange'])
-
+
return app
+
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def parseArguments (options):
- #Todo: allows an option --routing-key <key>
+
+def parseArguments(options):
+ # Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
-def subscribeToNotifications (options, channel, exchange, queue):
+
+def subscribeToNotifications(options, channel, exchange, queue):
for binding_key in options['BindingKeys']:
subscribeToNotificationsForTopic(binding_key, channel, exchange, queue)
+
def subscribeToNotificationsForTopic(binding_key, channel, exchange, queue):
try:
channel.queue_bind(exchange=exchange,
queue=queue,
routing_key=binding_key)
except pika.exceptions.ChannelClosed as e:
print("Channel error: {0}".format(e))
sys.exit(8)
-def consumeNotifications (channel, queue):
+
+def consumeNotifications(channel, queue):
channel.basic_consume(onBrokerMessage, queue=queue)
channel.start_consuming()
+
# -------------------------------------------------------------
# Procedural code
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-app = initializeApplication();
-parseArguments(app['options']);
-subscribeToNotifications(app['options'], app['channel'], app['exchange'], app['queue'])
+
+app = initializeApplication()
+parseArguments(app['options'])
+subscribeToNotifications(
+ app['options'],
+ app['channel'],
+ app['exchange'],
+ app['queue']
+)
consumeNotifications(app['channel'], app['queue'])
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jan 28, 09:12 (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2380239
Default Alt Text
(6 KB)
Attached To
Mode
rNOTIFCLI Notifications center CLI client
Attached
Detach File
Event Timeline
Log In to Comment