Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F3748561
D265.id617.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D265.id617.diff
View Options
diff --git a/notifications b/notifications
--- a/notifications
+++ b/notifications
@@ -12,21 +12,25 @@
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
+
import configparser
import json
import pika
import sys
import time
+
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def getConfig ():
+
+def getConfig():
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
+
def getCredentials(config):
return pika.PlainCredentials(
username=config['Broker']['User'],
@@ -34,6 +38,7 @@
erase_on_connect=True
)
+
def getBrokerConnection(config):
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
@@ -42,27 +47,33 @@
)
return pika.BlockingConnection(parameters)
+
def getExchange(config):
return config['Broker']['Exchange']
+
def getBrokerQueue(channel, exchange):
channel.exchange_declare(exchange=exchange, type='topic')
result = channel.queue_declare(exclusive=True)
return result.method.queue
+
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def getNotificationFormat():
return "[{time}] <{project}/{group}> {text}"
+
def getNotificationText(notification):
text = notification['text']
if notification['link']:
text += " — " + notification['link']
return text
+
def formatNotification(notificationMessage):
notification = json.loads(notificationMessage)
return getNotificationFormat().format(
@@ -76,37 +87,47 @@
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def onBrokerMessage(channel, basic_deliver, properties, body):
notification = formatNotification(body.decode("utf-8"))
print(notification)
channel.basic_ack(basic_deliver.delivery_tag)
+
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def getConnection (config):
+
+def getConnection(config):
try:
return getBrokerConnection(config)
except pika.exceptions.ProbableAccessDeniedError:
- print("Can't login to the broker: it's probably an access denied case.", file=sys.stderr)
+ print(
+ "Can't login to the broker: it's probably an access denied case.",
+ file=sys.stderr
+ )
sys.exit(2)
-def getChannel (config):
+
+def getChannel(config):
return getConnection(config).channel()
-def getQueue (channel, exchange):
+
+def getQueue(channel, exchange):
try:
return getBrokerQueue(channel, exchange)
except pika.exceptions.ChannelClosed as e:
print("Channel error: {0}".format(e))
sys.exit(4)
+
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def initializeApplication ():
+
+def initializeApplication():
app = {}
app['config'] = getConfig()
@@ -114,21 +135,25 @@
app['channel'] = getChannel(app['config'])
app['exchange'] = getExchange(app['config'])
app['queue'] = getQueue(app['channel'], app['exchange'])
-
+
return app
+
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def parseArguments (options):
- #Todo: allows an option --routing-key <key>
+
+def parseArguments(options):
+ # Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
-def subscribeToNotifications (options, channel, exchange, queue):
+
+def subscribeToNotifications(options, channel, exchange, queue):
for binding_key in options['BindingKeys']:
subscribeToNotificationsForTopic(binding_key, channel, exchange, queue)
+
def subscribeToNotificationsForTopic(binding_key, channel, exchange, queue):
try:
channel.queue_bind(exchange=exchange,
@@ -138,15 +163,23 @@
print("Channel error: {0}".format(e))
sys.exit(8)
-def consumeNotifications (channel, queue):
+
+def consumeNotifications(channel, queue):
channel.basic_consume(onBrokerMessage, queue=queue)
channel.start_consuming()
+
# -------------------------------------------------------------
# Procedural code
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-app = initializeApplication();
-parseArguments(app['options']);
-subscribeToNotifications(app['options'], app['channel'], app['exchange'], app['queue'])
+
+app = initializeApplication()
+parseArguments(app['options'])
+subscribeToNotifications(
+ app['options'],
+ app['channel'],
+ app['exchange'],
+ app['queue']
+)
consumeNotifications(app['channel'], app['queue'])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Nov 17, 09:31 (21 h, 5 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2246507
Default Alt Text
D265.id617.diff (5 KB)
Attached To
Mode
D265: Follow PEP-8 style guide
Attached
Detach File
Event Timeline
Log In to Comment