Page MenuHomeDevCentral

D992.id2536.diff
No OneTemporary

D992.id2536.diff

diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -117,10 +117,15 @@
return str(uuid.uuid4())
+def get_queue_name(queue_key):
+ """Map a queue name with its key."""
+ return "delivery-" + queue_key
+
+
def add_broker_queue(exchange_name, routing_key):
"""Add a queue to the broker, bind to the exchange."""
queue_key = generate_queue_key()
- queue_name = "delivery-" + queue_key
+ queue_name = get_queue_name(queue_key)
connection = get_broker_connection(service['config'])
channel = connection.channel()
@@ -134,6 +139,13 @@
return queue_key
+def delete_broker_queue(queue_name):
+ connection = get_broker_connection(service['config'])
+ channel = connection.channel()
+ channel.queue_delete(queue=queue_name)
+ connection.close(reply_text="Operation done")
+
+
def get_variable_from_request(key, default_value=None):
"""Gets variable from request JSON payload or a default value."""
if key in request.json:
@@ -152,6 +164,19 @@
return exchange_name
+def count_queue_messages(queue_name):
+ connection = get_broker_connection(service['config'])
+ channel = connection.channel()
+ result = channel.queue_declare(durable=True,
+ queue=queue_name,
+ passive=True)
+ return result.method.message_count
+
+
+def is_queue_empty(queue_name):
+ return count_queue_messages(queue_name) == 0
+
+
def error_handler(err):
"""Logs the exception, return 400"""
print("Channel error: {0}".format(err))
@@ -190,6 +215,28 @@
return jsonify(key=queue_key)
+@app.route(endpoint + "/unregister_consumer", methods=['POST'])
+def unregister_consumer():
+ """Unregister a queue key."""
+
+ # Reads request
+ if not request.json or "key" not in request.json:
+ abort(400)
+
+ queue = get_queue_name(request.json["key"])
+ force_delete = get_variable_from_request("force", False)
+
+ # Handles request
+ try:
+ if force_delete or is_queue_empty(queue):
+ delete_broker_queue(queue)
+ return jsonify(key=queue, succes=True, result="Deleted")
+ else:
+ return jsonify(key=queue, succes=False, result="Queue not empty")
+ except pika.exceptions.ChannelClosed as err:
+ return error_handler(err)
+
+
# -------------------------------------------------------------
# Start application
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 19, 08:20 (19 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2252241
Default Alt Text
D992.id2536.diff (2 KB)

Event Timeline