Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F11722104
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/notifications b/notifications
index db5659b..1092a1e 100755
--- a/notifications
+++ b/notifications
@@ -1,216 +1,216 @@
#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
"""
This module connects to the message broker, subscribes to the
notifications exchange, consumes messages, prints them on the console.
"""
import configparser
import json
import sys
import time
import pika
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_config():
"""Get a parser to read default the configuration file."""
config = configparser.ConfigParser()
config.read('/usr/local/etc/notifications.conf')
return config
def get_credentials(config):
"""Get credentials to connect to the broker from the configuration."""
return pika.PlainCredentials(
username=config['Broker']['User'],
password=config['Broker']['Password'],
erase_on_connect=True
)
def get_broker_connection(config):
"""Connect to the broker."""
parameters = pika.ConnectionParameters(
host=config['Broker']['Host'],
virtual_host=config['Broker']['Vhost'],
credentials=get_credentials(config)
)
return pika.BlockingConnection(parameters)
def get_exchange(config):
"""Get exchange point name from the configuration."""
return config['Broker']['Exchange']
def get_broker_queue(channel, exchange):
"""Ensure exchange exists and declare a temporary queue."""
- channel.exchange_declare(exchange=exchange, type='topic')
+ channel.exchange_declare(exchange=exchange, type='topic', durable=True)
result = channel.queue_declare(exclusive=True)
return result.method.queue
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_notification_format():
"""Get the format to use to print the notification."""
return "[{time}] <{project}/{group}> {text}"
def get_notification_text(notification):
"""Append when needed the notification link to the text return a string."""
text = notification['text']
if notification['link']:
text += " — " + notification['link']
return text
def format_notification(notification_message):
"""Format the notification as a string from a JSON message."""
notification = json.loads(notification_message)
return get_notification_format().format(
time=time.strftime("%H:%M:%S"),
project=notification['project'],
group=notification['group'],
text=get_notification_text(notification)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def on_broker_message(channel, basic_deliver, properties, body):
"""Callback used when a new message have been received from the queue."""
notification = format_notification(body.decode("utf-8"))
print(notification)
sys.stdout.flush()
channel.basic_ack(basic_deliver.delivery_tag)
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_connection(config):
"""Initialize and provide a connection to the broker"""
try:
return get_broker_connection(config)
except pika.exceptions.ProbableAccessDeniedError:
print(
"Can't login to the broker: it's probably an access denied case.",
file=sys.stderr
)
sys.exit(2)
def get_channel(config):
"""Initialize and provide a connection channel."""
return get_connection(config).channel()
def get_queue(channel, exchange):
"""Initialize and provide a broker queue for specified exchange."""
try:
return get_broker_queue(channel, exchange)
except pika.exceptions.ChannelClosed as exception:
print("Channel error: {0}".format(exception))
sys.exit(4)
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def initialize_application():
"""Initialize a container with required services."""
container = {}
container['config'] = get_config()
container['options'] = {}
container['channel'] = get_channel(container['config'])
container['exchange'] = get_exchange(container['config'])
container['queue'] = get_queue(container['channel'], container['exchange'])
return container
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def parse_arguments(options):
"""Parse arguments and fill an options array."""
# Todo: allows an option --routing-key <key>
options['BindingKeys'] = ['#']
def subscribe_to_notifications(options, channel, exchange, queue):
"""Subscribe to notifications for specified topics."""
for binding_key in options['BindingKeys']:
subscribe_to_topic(binding_key, channel, exchange, queue)
def subscribe_to_topic(binding_key, channel, exchange, queue):
"""Subscribe to notifications for one specified topic."""
try:
channel.queue_bind(exchange=exchange,
queue=queue,
routing_key=binding_key)
except pika.exceptions.ChannelClosed as exception:
print("Channel error: {0}".format(exception))
sys.exit(8)
def consume_notifications(channel, queue):
"""Consume notifications from a queue
and call our callback method when a message is received.
"""
channel.basic_consume(on_broker_message, queue=queue)
channel.start_consuming()
# -------------------------------------------------------------
# Application entry point
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def run_application():
"""Run the application."""
app = initialize_application()
parse_arguments(app['options'])
subscribe_to_notifications(
app['options'],
app['channel'],
app['exchange'],
app['queue']
)
consume_notifications(app['channel'], app['queue'])
run_application()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Sep 18, 01:00 (21 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2986236
Default Alt Text
(6 KB)
Attached To
Mode
rNOTIFCLI Notifications center CLI client
Attached
Detach File
Event Timeline
Log In to Comment