Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F22273
notifications
No One
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Authored By
dereckson
Feb 20 2016, 12:37
2016-02-20 12:37:27 (UTC+0)
Size
5 KB
Referenced Files
None
Subscribers
None
notifications
View Options
#!/usr/bin/env python3
# -------------------------------------------------------------
# Notifications center - CLI client
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Author: Sébastien Santoro aka Dereckson
# Project: Nasqueron
# Created: 2016-01-27
# Description: Connects to the message broker, subscribes to
# the notifications exchange, consumes messages,
# prints them on the console
# Dependencies: Pika, direct access to the broker
# -------------------------------------------------------------
import
configparser
import
json
import
pika
import
sys
import
time
# -------------------------------------------------------------
# Helper functions to parse config and connect to the broker
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
getConfig
():
config
=
configparser
.
ConfigParser
()
config
.
read
(
'/usr/local/etc/notifications.conf'
)
return
config
def
getCredentials
(
config
):
return
pika
.
PlainCredentials
(
username
=
config
[
'Broker'
][
'User'
],
password
=
config
[
'Broker'
][
'Password'
],
erase_on_connect
=
True
)
def
getBrokerConnection
(
config
):
parameters
=
pika
.
ConnectionParameters
(
host
=
config
[
'Broker'
][
'Host'
],
virtual_host
=
config
[
'Broker'
][
'Vhost'
],
credentials
=
getCredentials
(
config
)
)
return
pika
.
BlockingConnection
(
parameters
)
def
getExchange
(
config
):
return
config
[
'Broker'
][
'Exchange'
]
def
getBrokerQueue
(
channel
,
exchange
):
channel
.
exchange_declare
(
exchange
=
exchange
,
type
=
'topic'
)
result
=
channel
.
queue_declare
(
exclusive
=
True
)
return
result
.
method
.
queue
# -------------------------------------------------------------
# Helper functions to format the output
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
getNotificationFormat
():
return
"[
{time}
] <
{project}
/
{group}
>
{text}
"
def
getNotificationText
(
notification
):
text
=
notification
[
'text'
]
if
notification
[
'link'
]:
text
+=
" — "
+
notification
[
'link'
]
return
text
def
formatNotification
(
notificationMessage
):
notification
=
json
.
loads
(
notificationMessage
)
return
getNotificationFormat
()
.
format
(
time
=
time
.
strftime
(
"%H:%M:%S"
),
project
=
notification
[
'project'
],
group
=
notification
[
'group'
],
text
=
getNotificationText
(
notification
)
)
# -------------------------------------------------------------
# Callbacks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
onBrokerMessage
(
channel
,
basic_deliver
,
properties
,
body
):
notification
=
formatNotification
(
body
.
decode
(
"utf-8"
))
print
(
notification
)
channel
.
basic_ack
(
basic_deliver
.
delivery_tag
)
# -------------------------------------------------------------
# Services providers
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
getConnection
(
config
):
try
:
return
getBrokerConnection
(
config
)
except
pika
.
exceptions
.
ProbableAccessDeniedError
:
print
(
"Can't login to the broker: it's probably an access denied case."
,
file
=
sys
.
stderr
)
sys
.
exit
(
2
)
def
getChannel
(
config
):
return
getConnection
(
config
)
.
channel
()
def
getQueue
(
channel
,
exchange
):
try
:
return
getBrokerQueue
(
channel
,
exchange
)
except
pika
.
exceptions
.
ChannelClosed
as
e
:
print
(
"Channel error:
{0}
"
.
format
(
e
))
sys
.
exit
(
4
)
# -------------------------------------------------------------
# Services container
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
initializeApplication
():
app
=
{}
app
[
'config'
]
=
getConfig
()
app
[
'options'
]
=
{}
app
[
'channel'
]
=
getChannel
(
app
[
'config'
])
app
[
'exchange'
]
=
getExchange
(
app
[
'config'
])
app
[
'queue'
]
=
getQueue
(
app
[
'channel'
],
app
[
'exchange'
])
return
app
# -------------------------------------------------------------
# Main tasks
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
parseArguments
(
options
):
#Todo: allows an option --routing-key <key>
options
[
'BindingKeys'
]
=
[
'#'
]
def
subscribeToNotifications
(
options
,
channel
,
exchange
,
queue
):
for
binding_key
in
options
[
'BindingKeys'
]:
subscribeToNotificationsForTopic
(
binding_key
,
channel
,
exchange
,
queue
)
def
subscribeToNotificationsForTopic
(
binding_key
,
channel
,
exchange
,
queue
):
try
:
channel
.
queue_bind
(
exchange
=
exchange
,
queue
=
queue
,
routing_key
=
binding_key
)
except
pika
.
exceptions
.
ChannelClosed
as
e
:
print
(
"Channel error:
{0}
"
.
format
(
e
))
sys
.
exit
(
8
)
def
consumeNotifications
(
channel
,
queue
):
channel
.
basic_consume
(
onBrokerMessage
,
queue
=
queue
)
channel
.
start_consuming
()
# -------------------------------------------------------------
# Procedural code
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
app
=
initializeApplication
();
parseArguments
(
app
[
'options'
]);
subscribeToNotifications
(
app
[
'options'
],
app
[
'channel'
],
app
[
'exchange'
],
app
[
'queue'
])
consumeNotifications
(
app
[
'channel'
],
app
[
'queue'
])
File Metadata
Details
Attached
Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
19950
Default Alt Text
notifications (5 KB)
Attached To
Mode
P167 notifications
Attached
Detach File
Event Timeline
Log In to Comment