Page MenuHomeDevCentral

D299.id704.diff
No OneTemporary

D299.id704.diff

diff --git a/notifications b/notifications
--- a/notifications
+++ b/notifications
@@ -6,12 +6,13 @@
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
-# Description: Connects to the message broker, subscribes to
-# the notifications exchange, consumes messages,
-# prints them on the console
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
+"""
+This module connects to the message broker, subscribes to the
+notifications exchange, consumes messages, prints them on the console.
+"""
import configparser
import json
@@ -27,12 +28,14 @@
def get_config():
+ """Get a parser to read default the configuration file."""
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
def get_credentials(config):
+ """Get credentials to connect to the broker from the configuration."""
return pika.PlainCredentials(
username=config['Broker']['User'],
password=config['Broker']['Password'],
@@ -41,6 +44,7 @@
def get_broker_connection(config):
+ """Connect to the broker."""
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
virtual_host=config['Broker']['Vhost'],
@@ -50,10 +54,12 @@
def get_exchange(config):
+ """Get exchange point name from the configuration."""
return config['Broker']['Exchange']
def get_broker_queue(channel, exchange):
+ """Ensure exchange exists and declare a temporary queue."""
channel.exchange_declare(exchange=exchange, type='topic')
result = channel.queue_declare(exclusive=True)
return result.method.queue
@@ -65,10 +71,12 @@
def get_notification_format():
+ """Get the format to use to print the notification."""
return "[{time}] <{project}/{group}> {text}"
def get_notification_text(notification):
+ """Append when needed the notification link to the text return a string."""
text = notification['text']
if notification['link']:
text += " — " + notification['link']
@@ -76,6 +84,7 @@
def format_notification(notification_message):
+ """Format the notification as a string from a JSON message."""
notification = json.loads(notification_message)
return get_notification_format().format(
time=time.strftime("%H:%M:%S"),
@@ -90,6 +99,7 @@
def on_broker_message(channel, basic_deliver, properties, body):
+ """Callback used when a new message have been received from the queue."""
notification = format_notification(body.decode("utf-8"))
print(notification)
@@ -104,6 +114,7 @@
def get_connection(config):
+ """Initialize and provide a connection to the broker"""
try:
return get_broker_connection(config)
except pika.exceptions.ProbableAccessDeniedError:
@@ -115,10 +126,12 @@
def get_channel(config):
+ """Initialize and provide a connection channel."""
return get_connection(config).channel()
def get_queue(channel, exchange):
+ """Initialize and provide a broker queue for specified exchange."""
try:
return get_broker_queue(channel, exchange)
except pika.exceptions.ChannelClosed as exception:
@@ -132,6 +145,7 @@
def initialize_application():
+ """Initialize a container with required services."""
container = {}
container['config'] = get_config()
@@ -149,16 +163,19 @@
def parse_arguments(options):
+ """Parse arguments and fill an options array."""
# Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
def subscribe_to_notifications(options, channel, exchange, queue):
+ """Subscribe to notifications for specified topics."""
for binding_key in options['BindingKeys']:
subscribe_to_topic(binding_key, channel, exchange, queue)
def subscribe_to_topic(binding_key, channel, exchange, queue):
+ """Subscribe to notifications for one specified topic."""
try:
channel.queue_bind(exchange=exchange,
queue=queue,
@@ -169,6 +186,9 @@
def consume_notifications(channel, queue):
+ """Consume notifications from a queue
+ and call our callback method when a message is received.
+ """
channel.basic_consume(on_broker_message, queue=queue)
channel.start_consuming()
@@ -179,6 +199,7 @@
def run_application():
+ """Run the application."""
app = initialize_application()
parse_arguments(app['options'])

File Metadata

Mime Type
text/plain
Expires
Thu, May 1, 09:40 (19 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2621118
Default Alt Text
D299.id704.diff (4 KB)

Event Timeline