Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Paste
P167
notifications
Active
Public
Actions
Authored by
dereckson
on Feb 20 2016, 12:37.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
None
Referenced Files
F22273: notifications
Feb 20 2016, 12:37
2016-02-20 12:37:27 (UTC+0)
Subscribers
None
#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Description: Connects to the message broker, subscribes to
# the notifications exchange, consumes messages,
# prints them on the console
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
import
configparser
import
json
import
pika
import
sys
import
time
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
getConfig
():
config
=
configparser
.
ConfigParser
()
config
.
read
(
'/usr/local/etc/notifications.conf'
)
return
config
def
getCredentials
(
config
):
return
pika
.
PlainCredentials
(
username
=
config
[
'Broker'
][
'User'
],
password
=
config
[
'Broker'
][
'Password'
],
erase_on_connect
=
True
)
def
getBrokerConnection
(
config
):
parameters
=
pika
.
ConnectionParameters
(
host
=
config
[
'Broker'
][
'Host'
],
virtual_host
=
config
[
'Broker'
][
'Vhost'
],
credentials
=
getCredentials
(
config
)
)
return
pika
.
BlockingConnection
(
parameters
)
def
getExchange
(
config
):
return
config
[
'Broker'
][
'Exchange'
]
def
getBrokerQueue
(
channel
,
exchange
):
channel
.
exchange_declare
(
exchange
=
exchange
,
type
=
'topic'
)
result
=
channel
.
queue_declare
(
exclusive
=
True
)
return
result
.
method
.
queue
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
getNotificationFormat
():
return
"[
{time}
] <
{project}
/
{group}
>
{text}
"
def
getNotificationText
(
notification
):
text
=
notification
[
'text'
]
if
notification
[
'link'
]:
text
+=
" — "
+
notification
[
'link'
]
return
text
def
formatNotification
(
notificationMessage
):
notification
=
json
.
loads
(
notificationMessage
)
return
getNotificationFormat
()
.
format
(
time
=
time
.
strftime
(
"%H:%M:%S"
),
project
=
notification
[
'project'
],
group
=
notification
[
'group'
],
text
=
getNotificationText
(
notification
)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
onBrokerMessage
(
channel
,
basic_deliver
,
properties
,
body
):
notification
=
formatNotification
(
body
.
decode
(
"utf-8"
))
print
(
notification
)
channel
.
basic_ack
(
basic_deliver
.
delivery_tag
)
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
getConnection
(
config
):
try
:
return
getBrokerConnection
(
config
)
except
pika
.
exceptions
.
ProbableAccessDeniedError
:
print
(
"Can't login to the broker: it's probably an access denied case."
,
file
=
sys
.
stderr
)
sys
.
exit
(
2
)
def
getChannel
(
config
):
return
getConnection
(
config
)
.
channel
()
def
getQueue
(
channel
,
exchange
):
try
:
return
getBrokerQueue
(
channel
,
exchange
)
except
pika
.
exceptions
.
ChannelClosed
as
e
:
print
(
"Channel error:
{0}
"
.
format
(
e
))
sys
.
exit
(
4
)
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
initializeApplication
():
app
=
{}
app
[
'config'
]
=
getConfig
()
app
[
'options'
]
=
{}
app
[
'channel'
]
=
getChannel
(
app
[
'config'
])
app
[
'exchange'
]
=
getExchange
(
app
[
'config'
])
app
[
'queue'
]
=
getQueue
(
app
[
'channel'
],
app
[
'exchange'
])
return
app
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
parseArguments
(
options
):
#Todo: allows an option --routing-key <key>
options
[
'BindingKeys'
]
=
[
'#'
]
def
subscribeToNotifications
(
options
,
channel
,
exchange
,
queue
):
for
binding_key
in
options
[
'BindingKeys'
]:
subscribeToNotificationsForTopic
(
binding_key
,
channel
,
exchange
,
queue
)
def
subscribeToNotificationsForTopic
(
binding_key
,
channel
,
exchange
,
queue
):
try
:
channel
.
queue_bind
(
exchange
=
exchange
,
queue
=
queue
,
routing_key
=
binding_key
)
except
pika
.
exceptions
.
ChannelClosed
as
e
:
print
(
"Channel error:
{0}
"
.
format
(
e
))
sys
.
exit
(
8
)
def
consumeNotifications
(
channel
,
queue
):
channel
.
basic_consume
(
onBrokerMessage
,
queue
=
queue
)
channel
.
start_consuming
()
# -------------------------------------------------------------
# Procedural code
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
app
=
initializeApplication
();
parseArguments
(
app
[
'options'
]);
subscribeToNotifications
(
app
[
'options'
],
app
[
'channel'
],
app
[
'exchange'
],
app
[
'queue'
])
consumeNotifications
(
app
[
'channel'
],
app
[
'queue'
])
Event Timeline
dereckson
edited the content of this paste.
(Show Details)
Feb 20 2016, 12:37
2016-02-20 12:37:27 (UTC+0)
dereckson
changed the title of this paste from untitled to
notifications
.
Log In to Comment