Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F7648878
D299.id704.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
D299.id704.diff
View Options
diff --git a/notifications b/notifications
--- a/notifications
+++ b/notifications
@@ -6,12 +6,13 @@
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
-# Description: Connects to the message broker, subscribes to
-# the notifications exchange, consumes messages,
-# prints them on the console
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
+"""
+This module connects to the message broker, subscribes to the
+notifications exchange, consumes messages, prints them on the console.
+"""
import configparser
import json
@@ -27,12 +28,14 @@
def get_config():
+ """Get a parser to read default the configuration file."""
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
def get_credentials(config):
+ """Get credentials to connect to the broker from the configuration."""
return pika.PlainCredentials(
username=config['Broker']['User'],
password=config['Broker']['Password'],
@@ -41,6 +44,7 @@
def get_broker_connection(config):
+ """Connect to the broker."""
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
virtual_host=config['Broker']['Vhost'],
@@ -50,10 +54,12 @@
def get_exchange(config):
+ """Get exchange point name from the configuration."""
return config['Broker']['Exchange']
def get_broker_queue(channel, exchange):
+ """Ensure exchange exists and declare a temporary queue."""
channel.exchange_declare(exchange=exchange, type='topic')
result = channel.queue_declare(exclusive=True)
return result.method.queue
@@ -65,10 +71,12 @@
def get_notification_format():
+ """Get the format to use to print the notification."""
return "[{time}] <{project}/{group}> {text}"
def get_notification_text(notification):
+ """Append when needed the notification link to the text return a string."""
text = notification['text']
if notification['link']:
text += " — " + notification['link']
@@ -76,6 +84,7 @@
def format_notification(notification_message):
+ """Format the notification as a string from a JSON message."""
notification = json.loads(notification_message)
return get_notification_format().format(
time=time.strftime("%H:%M:%S"),
@@ -90,6 +99,7 @@
def on_broker_message(channel, basic_deliver, properties, body):
+ """Callback used when a new message have been received from the queue."""
notification = format_notification(body.decode("utf-8"))
print(notification)
@@ -104,6 +114,7 @@
def get_connection(config):
+ """Initialize and provide a connection to the broker"""
try:
return get_broker_connection(config)
except pika.exceptions.ProbableAccessDeniedError:
@@ -115,10 +126,12 @@
def get_channel(config):
+ """Initialize and provide a connection channel."""
return get_connection(config).channel()
def get_queue(channel, exchange):
+ """Initialize and provide a broker queue for specified exchange."""
try:
return get_broker_queue(channel, exchange)
except pika.exceptions.ChannelClosed as exception:
@@ -132,6 +145,7 @@
def initialize_application():
+ """Initialize a container with required services."""
container = {}
container['config'] = get_config()
@@ -149,16 +163,19 @@
def parse_arguments(options):
+ """Parse arguments and fill an options array."""
# Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
def subscribe_to_notifications(options, channel, exchange, queue):
+ """Subscribe to notifications for specified topics."""
for binding_key in options['BindingKeys']:
subscribe_to_topic(binding_key, channel, exchange, queue)
def subscribe_to_topic(binding_key, channel, exchange, queue):
+ """Subscribe to notifications for one specified topic."""
try:
channel.queue_bind(exchange=exchange,
queue=queue,
@@ -169,6 +186,9 @@
def consume_notifications(channel, queue):
+ """Consume notifications from a queue
+ and call our callback method when a message is received.
+ """
channel.basic_consume(on_broker_message, queue=queue)
channel.start_consuming()
@@ -179,6 +199,7 @@
def run_application():
+ """Run the application."""
app = initialize_application()
parse_arguments(app['options'])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, May 1, 09:40 (19 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2621118
Default Alt Text
D299.id704.diff (4 KB)
Attached To
Mode
D299: Code documentation
Attached
Detach File
Event Timeline
Log In to Comment