Page MenuHomeDevCentral

D991.id2534.diff
No OneTemporary

D991.id2534.diff

diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -1,22 +1,217 @@
#!/usr/bin/env python3
-###
-# Notifications center
-# Delivery API
-###
+# -------------------------------------------------------------
+# Notifications center - Delivery API
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Author: Sébastien Santoro aka Dereckson
+# Project: Nasqueron
+# Description: AMQP to HTTP gateway
+# Created: 2017-05-22
+# Dependencies: Pika, direct access to the broker
+# -------------------------------------------------------------
-from flask import Flask
+"""
+This module connects to the message broker, fire a web server,
+and allow to interact through the broker from HTTP requests.
+"""
+
+from flask import Flask, abort, jsonify, request
+import uuid
+import pika
+import os
+import sys
+
+
+# -------------------------------------------------------------
+# Configuration
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
app = Flask(__name__)
+service = {}
endpoint = "/delivery"
+def mandatory_environment_variables():
+ return ["BROKER_HOST", "BROKER_USERNAME", "BROKER_PASSWORD"]
+
+
+def check_config():
+ """Ensure configuration is complete."""
+ return set(mandatory_environment_variables()).issubset(os.environ)
+
+
+def get_config():
+ return {
+ "Broker": get_broker_config(),
+ "DefaultExchange": get_default_exchange(),
+ }
+
+
+def get_broker_config():
+ return {
+ "Host": os.environ["BROKER_HOST"],
+ "User": os.environ["BROKER_USERNAME"],
+ "Password": os.environ["BROKER_PASSWORD"],
+ "Vhost": get_broker_vhost(),
+ }
+
+
+def get_broker_vhost():
+ if "BROKER_VHOST" in os.environ:
+ return os.environ["BROKER_VHOST"]
+
+ return "/"
+
+
+def get_default_exchange():
+ if "DEFAULT_EXCHANGE" in os.environ:
+ return os.environ["DEFAULT_EXCHANGE"]
+
+ return None
+
+
+# -------------------------------------------------------------
+# Broker connection helper methods
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def get_credentials(config):
+ """Get credentials to connect to the broker from the configuration."""
+ return pika.PlainCredentials(
+ username=config['Broker']['User'],
+ password=config['Broker']['Password'],
+ erase_on_connect=True
+ )
+
+
+def get_broker_connection_parameters(config):
+ return pika.ConnectionParameters(
+ host=config['Broker']['Host'],
+ credentials=get_credentials(config),
+ virtual_host=config['Broker']['Vhost'])
+
+
+def get_broker_connection(config):
+ """Connect to the broker."""
+ parameters = get_broker_connection_parameters(config)
+ return pika.BlockingConnection(parameters)
+
+
+# -------------------------------------------------------------
+# API helper methods
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def get_default_exchange_name():
+ """Gets from the configuration the default exchange,
+ or return a 400 if undefined."""
+ if service['config']['DefaultExchange'] is None:
+ abort(400)
+
+ return service['config']['DefaultExchange']
+
+
+def generate_queue_key():
+ """Generates an API key matching a specific queue."""
+ return str(uuid.uuid4())
+
+
+def add_broker_queue(exchange_name, routing_key):
+ """Add a queue to the broker, bind to the exchange."""
+ queue_key = generate_queue_key()
+ queue_name = "delivery-" + queue_key
+
+ connection = get_broker_connection(service['config'])
+ channel = connection.channel()
+ channel.queue_declare(durable=True,
+ queue=queue_name)
+ channel.queue_bind(exchange=exchange_name,
+ queue=queue_name,
+ routing_key=routing_key)
+ connection.close(reply_text="Operation done")
+
+ return queue_key
+
+
+def get_variable_from_request(key, default_value=None):
+ """Gets variable from request JSON payload or a default value."""
+ if key in request.json:
+ return request.json[key]
+
+ return default_value
+
+
+def get_exchange_from_request():
+ """Get the exchange name from the request, the environment, or abort."""
+ exchange_name = get_variable_from_request("exchange")
+
+ if exchange_name is None:
+ return get_default_exchange_name()
+
+ return exchange_name
+
+
+def error_handler(err):
+ """Logs the exception, return 400"""
+ print("Channel error: {0}".format(err))
+ abort(400)
+
+
+# -------------------------------------------------------------
+# API methods
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
@app.route(endpoint + "/status")
def status():
- '''Determine if the application returns a 200 on GET request.'''
+ """Determine if the application returns a 200 on GET request."""
return "ALIVE"
-if __name__ == "__main__":
- app.run(host="0.0.0.0")
+@app.route(endpoint + "/register_consumer", methods=['POST'])
+def register_consumer():
+ """Subscribe to an exchange, record the queue, send queue key."""
+
+ # Reads request
+ if not request.json:
+ abort(400)
+
+ exchange_name = get_exchange_from_request()
+ routing_key = get_variable_from_request("routing-key", "*")
+
+ # Handles request
+ try:
+ queue_key = add_broker_queue(exchange_name, routing_key)
+ except pika.exceptions.ChannelClosed as err:
+ return error_handler(err)
+
+ # Returns result
+ return jsonify(key=queue_key)
+
+
+# -------------------------------------------------------------
+# Start application
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def initialize_application():
+ """Initialize a container with required services."""
+ return {
+ 'config': get_config()
+ }
+
+
+def run_application(web_application):
+ """Run the server."""
+ if not check_config():
+ sys.exit(1)
+
+ global service
+ service = initialize_application()
+
+ if __name__ == "__main__":
+ web_application.run(host="0.0.0.0")
+
+
+run_application(app)
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,6 +7,7 @@
MarkupSafe==1.0
mccabe==0.6.1
pep8==1.7.0
+pika==0.10.0
pycodestyle==2.3.1
pyflakes==1.5.0
Werkzeug==0.12.2

File Metadata

Mime Type
text/plain
Expires
Mon, Feb 3, 06:42 (20 h, 43 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2392341
Default Alt Text
D991.id2534.diff (6 KB)

Event Timeline