Page MenuHomeDevCentral

D2718.diff
No OneTemporary

D2718.diff

diff --git a/.arclint b/.arclint
--- a/.arclint
+++ b/.arclint
@@ -17,20 +17,20 @@
"pep8": {
"type": "pep8",
"include": [
- "(\\.py$)",
- "(^notifications$)"
+ "(\\.py$)"
],
"severity": {
- "E401": "warning"
+ "E401": "warning",
+ "E501": "advice"
}
},
"flake8": {
"type": "flake8",
"include": [
- "(\\.py$)",
- "(^notifications$)"
+ "(\\.py$)"
],
"severity": {
+ "E501": "advice",
"E901": "advice"
}
}
diff --git a/.gitignore b/.gitignore
new file mode 100644
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+__pycache__/
diff --git a/announcer.py b/announcer.py
new file mode 100644
--- /dev/null
+++ b/announcer.py
@@ -0,0 +1,18 @@
+import queue
+
+
+class MessageAnnouncer:
+ def __init__(self):
+ self.listeners = []
+
+ def listen(self):
+ q = queue.Queue(maxsize=5)
+ self.listeners.append(q)
+ return q
+
+ def announce(self, msg):
+ for i in reversed(range(len(self.listeners))):
+ try:
+ self.listeners[i].put_nowait(msg)
+ except queue.Full:
+ del self.listeners[i]
diff --git a/auth.py b/auth.py
new file mode 100644
--- /dev/null
+++ b/auth.py
@@ -0,0 +1,20 @@
+import hvac
+
+
+def login(config):
+ client = hvac.Client()
+
+ client.auth.approle.login(
+ role_id=config["Vault"]["Role_id"],
+ secret_id=config["Vault"]["Secret_id"],
+ )
+ return client
+
+
+def get_credentials(client, config):
+ read_response = client.secrets.kv.read_secret_version(
+ path=config["Vault"]["Path"], mount_point=config["Vault"]["Mount_Point"]
+ )
+ config["Broker"]["User"] = read_response["data"]["data"]["User"]
+ config["Broker"]["Password"] = read_response["data"]["data"]["Password"]
+ return config
diff --git a/broker.py b/broker.py
new file mode 100644
--- /dev/null
+++ b/broker.py
@@ -0,0 +1,33 @@
+import pika
+
+
+def get_credentials(config):
+ """Get credentials to connect to the broker from the configuration."""
+ return pika.PlainCredentials(
+ username=config["Broker"]["User"],
+ password=config["Broker"]["Password"],
+ erase_on_connect=True,
+ )
+
+
+def get_broker_connection(config):
+ """Connect to the broker."""
+ parameters = pika.ConnectionParameters(
+ host=config["Broker"]["Host"],
+ virtual_host=config["Broker"]["Vhost"],
+ credentials=get_credentials(config),
+ )
+ return pika.BlockingConnection(parameters)
+
+
+def get_exchange(config):
+ """Get exchange point name from the configuration."""
+ return config["Broker"]["Exchange"]
+
+
+def get_broker_queue(channel, exchange):
+ """Ensure exchange exists and declare a temporary queue."""
+ # channel.exchange_declare(exchange=exchange, exchange_type="topic", durable=True)
+ result = channel.queue_declare("", exclusive=True)
+ channel.queue_bind(result.method.queue, exchange, routing_key="#")
+ return result.method.queue
diff --git a/config.py b/config.py
new file mode 100644
--- /dev/null
+++ b/config.py
@@ -0,0 +1,17 @@
+import configparser
+import os
+import sys
+
+
+def get_config_file():
+ try:
+ return os.environ["API_SSE_CONFIG"]
+ except KeyError:
+ print("The ENV variable 'API_SSE_CONFIG' is missing", file=sys.stderr)
+ sys.exit(1)
+
+
+def get_config():
+ config = configparser.ConfigParser()
+ config.read(get_config_file())
+ return config
diff --git a/dev-requirements.txt b/dev-requirements.txt
new file mode 100644
--- /dev/null
+++ b/dev-requirements.txt
@@ -0,0 +1,3 @@
+black==22.12.0
+flake8==6.0.0
+pep8==1.7.1
diff --git a/rabbitmq-flask-client.py b/rabbitmq-flask-client.py
new file mode 100644
--- /dev/null
+++ b/rabbitmq-flask-client.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python3
+
+from flask import Flask, Response
+from threading import Thread
+from flask_cors import CORS
+from pika.exceptions import ChannelClosed, AMQPConnectionError
+import logging
+import time
+
+from announcer import MessageAnnouncer
+import broker
+import config
+
+app = Flask(__name__)
+cors = CORS(app, resources={r"/*": {"origins": "*"}})
+
+config = config.get_config()
+
+if "Vault" in config:
+ import auth
+
+ client = auth.login(config)
+ config = auth.get_credentials(client, config)
+
+
+@app.route("/", methods=["GET"])
+def get_notifications():
+ def stream():
+ messages = announcer.listen()
+ while True:
+ msg = messages.get()
+ yield msg
+
+ return Response(stream(), mimetype="text/event-stream")
+
+
+def on_notification_received(message_announcer, channel, method, body):
+ message = "event : notification \ndata: " + body.decode() + "\n\n"
+ message_announcer.announce(message)
+ channel.basic_ack(delivery_tag=method.delivery_tag)
+
+
+def listen_notifications():
+ while True:
+ retries = 0
+ success = False
+
+ while retries < 13:
+ try:
+ connection = broker.get_broker_connection(config)
+ success = True
+ logging.info("Service started")
+ break
+ except AMQPConnectionError as e:
+ delay = 2**retries / 10
+ logging.warning(f"Can't connect to RabbitMQ: {e}")
+ logging.debug(f"Retrying in {delay}s")
+ time.sleep(delay)
+ retries += 1
+
+ if not success:
+ raise RuntimeError(
+ "Can't connect to AMQP, giving up after too many retries."
+ )
+
+ channel = connection.channel()
+ queue = broker.get_broker_queue(channel, broker.get_exchange(config))
+
+ channel.basic_consume(
+ queue=queue,
+ on_message_callback=lambda ch, method, properties, body: on_notification_received(
+ announcer, ch, method, body
+ ),
+ auto_ack=False,
+ )
+ try:
+ channel.start_consuming()
+ except (ChannelClosed, AMQPConnectionError) as e:
+ logging.warning(f"Connection issue: {e}")
+
+
+def app_init():
+ thread = Thread(target=listen_notifications)
+ thread.daemon = True
+ thread.start()
+
+
+announcer = MessageAnnouncer()
+app_init()
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,4 @@
+hvac>=1.0.2,<2.0
+Flask>=2.2.2,<3.0
+Flask-Cors>=3.0.10,<4.0
+pika>=1.3.1,<2.0

File Metadata

Mime Type
text/plain
Expires
Mon, Dec 23, 14:12 (18 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2312602
Default Alt Text
D2718.diff (6 KB)

Event Timeline