diff --git a/app.py b/app.py --- a/app.py +++ b/app.py @@ -16,6 +16,7 @@ """ from flask import Flask, abort, jsonify, request +import base64 import uuid import pika import os @@ -98,6 +99,32 @@ return pika.BlockingConnection(parameters) +# ------------------------------------------------------------- +# Message format helper methods +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + +def get_valid_formats(): + return [ + "as-is", + "base64" + ] + + +def is_valid_format(message_format): + return message_format in get_valid_formats() + + +def format_message(message_format, message): + if message_format == "as-is": + return message.decode("utf-8") + + if message_format == "base64": + return base64.b64encode(message).decode("ascii") + + raise ValueError('Unknown format: ' + message_format) + + # ------------------------------------------------------------- # API helper methods # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -183,6 +210,22 @@ abort(400) +def get_single_message(queue, message_format, no_ack): + connection = get_broker_connection(service['config']) + channel = connection.channel() + + method_frame, header_frame, body = channel.basic_get( + queue=queue, no_ack=no_ack) + if method_frame: + return { + "key": method_frame.delivery_tag, + "format": message_format, + "content": format_message(message_format, body) + } + + return {} + + # ------------------------------------------------------------- # API methods # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -235,6 +278,30 @@ return error_handler(err) +@app.route(endpoint + "/get", methods=['POST']) +def get(): + """Consume a message from the queue.""" + + # Reads request + if not request.json or "key" not in request.json: + abort(400) + + queue = get_queue_name(request.json["key"]) + message_format = get_variable_from_request("format", "base64") + no_ack = get_variable_from_request("no-ack", False) + + if not is_valid_format(message_format): + print("Invalid format: {0}".format(message_format)) + abort(400) + + # Handles request + try: + message = get_single_message(queue, message_format, no_ack) + return jsonify(message) + except pika.exceptions.ChannelClosed as err: + return error_handler(err) + + # ------------------------------------------------------------- # Start application # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -