Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F9790286
notifications
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
notifications
View Options
#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
"""
This module connects to the message broker, subscribes to the
notifications exchange, consumes messages, prints them on the console.
"""
import
configparser
import
json
import
sys
import
time
import
pika
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
get_config
():
"""Get a parser to read default the configuration file."""
config
=
configparser
.
ConfigParser
()
config
.
read
(
'/usr/local/etc/notifications.conf'
)
return
config
def
get_credentials
(
config
):
"""Get credentials to connect to the broker from the configuration."""
return
pika
.
PlainCredentials
(
username
=
config
[
'Broker'
][
'User'
],
password
=
config
[
'Broker'
][
'Password'
],
erase_on_connect
=
True
)
def
get_broker_connection
(
config
):
"""Connect to the broker."""
parameters
=
pika
.
ConnectionParameters
(
host
=
config
[
'Broker'
][
'Host'
],
virtual_host
=
config
[
'Broker'
][
'Vhost'
],
credentials
=
get_credentials
(
config
)
)
return
pika
.
BlockingConnection
(
parameters
)
def
get_exchange
(
config
):
"""Get exchange point name from the configuration."""
return
config
[
'Broker'
][
'Exchange'
]
def
get_broker_queue
(
channel
,
exchange
):
"""Ensure exchange exists and declare a temporary queue."""
channel
.
exchange_declare
(
exchange
=
exchange
,
exchange_type
=
'topic'
,
durable
=
True
)
result
=
channel
.
queue_declare
(
exclusive
=
True
)
return
result
.
method
.
queue
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
get_notification_format
():
"""Get the format to use to print the notification."""
return
"[
{time}
] <
{project}
/
{group}
>
{text}
"
def
get_notification_text
(
notification
):
"""Append when needed the notification link to the text return a string."""
text
=
notification
[
'text'
]
if
notification
[
'link'
]:
text
+=
" — "
+
notification
[
'link'
]
return
text
def
format_notification
(
notification_message
):
"""Format the notification as a string from a JSON message."""
notification
=
json
.
loads
(
notification_message
)
return
get_notification_format
()
.
format
(
time
=
time
.
strftime
(
"%b
%d
%H:%M:%S"
),
project
=
notification
[
'project'
],
group
=
notification
[
'group'
],
text
=
get_notification_text
(
notification
)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
on_broker_message
(
channel
,
basic_deliver
,
properties
,
body
):
"""Callback used when a new message have been received from the queue."""
notification
=
format_notification
(
body
.
decode
(
"utf-8"
))
print
(
notification
)
sys
.
stdout
.
flush
()
channel
.
basic_ack
(
basic_deliver
.
delivery_tag
)
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
get_connection
(
config
):
"""Initialize and provide a connection to the broker"""
try
:
return
get_broker_connection
(
config
)
except
pika
.
exceptions
.
ProbableAccessDeniedError
:
print
(
"Can't login to the broker: it's probably an access denied case."
,
file
=
sys
.
stderr
)
sys
.
exit
(
2
)
def
get_channel
(
config
):
"""Initialize and provide a connection channel."""
return
get_connection
(
config
)
.
channel
()
def
get_queue
(
channel
,
exchange
):
"""Initialize and provide a broker queue for specified exchange."""
try
:
return
get_broker_queue
(
channel
,
exchange
)
except
pika
.
exceptions
.
ChannelClosed
as
exception
:
print
(
"Channel error:
{0}
"
.
format
(
exception
))
sys
.
exit
(
4
)
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
initialize_application
():
"""Initialize a container with required services."""
container
=
{}
container
[
'config'
]
=
get_config
()
container
[
'options'
]
=
{}
container
[
'channel'
]
=
get_channel
(
container
[
'config'
])
container
[
'exchange'
]
=
get_exchange
(
container
[
'config'
])
container
[
'queue'
]
=
get_queue
(
container
[
'channel'
],
container
[
'exchange'
])
return
container
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
parse_arguments
(
options
):
"""Parse arguments and fill an options array."""
# Todo: allows an option --routing-key <key>
options
[
'BindingKeys'
]
=
[
'#'
]
def
subscribe_to_notifications
(
options
,
channel
,
exchange
,
queue
):
"""Subscribe to notifications for specified topics."""
for
binding_key
in
options
[
'BindingKeys'
]:
subscribe_to_topic
(
binding_key
,
channel
,
exchange
,
queue
)
def
subscribe_to_topic
(
binding_key
,
channel
,
exchange
,
queue
):
"""Subscribe to notifications for one specified topic."""
try
:
channel
.
queue_bind
(
exchange
=
exchange
,
queue
=
queue
,
routing_key
=
binding_key
)
except
pika
.
exceptions
.
ChannelClosed
as
exception
:
print
(
"Channel error:
{0}
"
.
format
(
exception
))
sys
.
exit
(
8
)
def
consume_notifications
(
channel
,
queue
):
"""Consume notifications from a queue
and call our callback method when a message is received.
"""
channel
.
basic_consume
(
on_broker_message
,
queue
=
queue
)
channel
.
start_consuming
()
# -------------------------------------------------------------
# Application entry point
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
run_application
():
"""Run the application."""
app
=
initialize_application
()
parse_arguments
(
app
[
'options'
])
subscribe_to_notifications
(
app
[
'options'
],
app
[
'channel'
],
app
[
'exchange'
],
app
[
'queue'
]
)
consume_notifications
(
app
[
'channel'
],
app
[
'queue'
])
if
__name__
==
"__main__"
:
run_application
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Mon, Jun 16, 00:17 (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2733647
Default Alt Text
notifications (6 KB)
Attached To
Mode
rNOTIFCLI Notifications center CLI client
Attached
Detach File
Event Timeline
Log In to Comment