Page MenuHomeDevCentral

notifications
No OneTemporary

notifications

#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
"""
This module connects to the message broker, subscribes to the
notifications exchange, consumes messages, prints them on the console.
"""
import configparser
import json
import sys
import time
import pika
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_config():
"""Get a parser to read default the configuration file."""
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
def get_credentials(config):
"""Get credentials to connect to the broker from the configuration."""
return pika.PlainCredentials(
username=config['Broker']['User'],
password=config['Broker']['Password'],
erase_on_connect=True
)
def get_broker_connection(config):
"""Connect to the broker."""
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
virtual_host=config['Broker']['Vhost'],
credentials=get_credentials(config)
)
return pika.BlockingConnection(parameters)
def get_exchange(config):
"""Get exchange point name from the configuration."""
return config['Broker']['Exchange']
def get_broker_queue(channel, exchange):
"""Ensure exchange exists and declare a temporary queue."""
channel.exchange_declare(
exchange=exchange, exchange_type='topic', durable=True)
result = channel.queue_declare(exclusive=True)
return result.method.queue
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_notification_format():
"""Get the format to use to print the notification."""
return "[{time}] <{project}/{group}> {text}"
def get_notification_text(notification):
"""Append when needed the notification link to the text return a string."""
text = notification['text']
if notification['link']:
text += " — " + notification['link']
return text
def format_notification(notification_message):
"""Format the notification as a string from a JSON message."""
notification = json.loads(notification_message)
return get_notification_format().format(
time=time.strftime("%b %d %H:%M:%S"),
project=notification['project'],
group=notification['group'],
text=get_notification_text(notification)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def on_broker_message(channel, basic_deliver, properties, body):
"""Callback used when a new message have been received from the queue."""
notification = format_notification(body.decode("utf-8"))
print(notification)
sys.stdout.flush()
channel.basic_ack(basic_deliver.delivery_tag)
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_connection(config):
"""Initialize and provide a connection to the broker"""
try:
return get_broker_connection(config)
except pika.exceptions.ProbableAccessDeniedError:
print(
"Can't login to the broker: it's probably an access denied case.",
file=sys.stderr
)
sys.exit(2)
def get_channel(config):
"""Initialize and provide a connection channel."""
return get_connection(config).channel()
def get_queue(channel, exchange):
"""Initialize and provide a broker queue for specified exchange."""
try:
return get_broker_queue(channel, exchange)
except pika.exceptions.ChannelClosed as exception:
print("Channel error: {0}".format(exception))
sys.exit(4)
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def initialize_application():
"""Initialize a container with required services."""
container = {}
container['config'] = get_config()
container['options'] = {}
container['channel'] = get_channel(container['config'])
container['exchange'] = get_exchange(container['config'])
container['queue'] = get_queue(container['channel'], container['exchange'])
return container
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def parse_arguments(options):
"""Parse arguments and fill an options array."""
# Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
def subscribe_to_notifications(options, channel, exchange, queue):
"""Subscribe to notifications for specified topics."""
for binding_key in options['BindingKeys']:
subscribe_to_topic(binding_key, channel, exchange, queue)
def subscribe_to_topic(binding_key, channel, exchange, queue):
"""Subscribe to notifications for one specified topic."""
try:
channel.queue_bind(exchange=exchange,
queue=queue,
routing_key=binding_key)
except pika.exceptions.ChannelClosed as exception:
print("Channel error: {0}".format(exception))
sys.exit(8)
def consume_notifications(channel, queue):
"""Consume notifications from a queue
and call our callback method when a message is received.
"""
channel.basic_consume(on_broker_message, queue=queue)
channel.start_consuming()
# -------------------------------------------------------------
# Application entry point
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def run_application():
"""Run the application."""
app = initialize_application()
parse_arguments(app['options'])
subscribe_to_notifications(
app['options'],
app['channel'],
app['exchange'],
app['queue']
)
consume_notifications(app['channel'], app['queue'])
if __name__ == "__main__":
run_application()

File Metadata

Mime Type
text/x-python
Expires
Mon, Jun 16, 00:17 (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2733647
Default Alt Text
notifications (6 KB)

Event Timeline