Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F3765771
D2754.id8447.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Referenced Files
None
Subscribers
None
D2754.id8447.diff
View Options
diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,10 @@
+# Rust
/target
+Cargo.lock
+
+# Python
+__pycache__
+
+# Data
/FANTOIR*
!/fantoir-datasource
-Cargo.lock
diff --git a/_pipelines/README.md b/_pipelines/README.md
new file mode 100644
--- /dev/null
+++ b/_pipelines/README.md
@@ -0,0 +1,14 @@
+## Nasqueron Datasources :: pipelines
+
+The dags/ directory contains DAGs pipelines as code for Apache Airflow.
+
+Those pipelines can be used:
+
+ - at Nasqueron, on our Airflow instance
+ - elsewhere, as a sample documentation how to use our datasources
+ components and how to glue the components together
+
+The nasqueron_datasources module is published to the dags folder too,
+so is available from the different DAGs. It contains helper methods.
+
+Unit tests are available in tests/ folder.
diff --git a/_pipelines/dags/fantoir_fetch.py b/_pipelines/dags/fantoir_fetch.py
new file mode 100755
--- /dev/null
+++ b/_pipelines/dags/fantoir_fetch.py
@@ -0,0 +1,115 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Pipeline: Datasources > FANTOIR > fetch
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+from datetime import datetime
+import json
+import requests
+
+from airflow.decorators import dag, task
+from airflow.models import Variable, TaskInstance
+from airflow.operators.python import ShortCircuitOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+
+from nasqueron_datasources.pipelines.commands import run, parse_environment
+from nasqueron_datasources.pipelines.errors import CommandException, WorkflowException
+
+
+NOTIFICATION_URL = "https://notifications.nasqueron.org/gate/Notifications/Nasqueron"
+
+
+@dag(
+ dag_id="fantoir_fetch",
+ schedule=None,
+ start_date=datetime(2023, 1, 1),
+ tags=["datasources", "fantoir", "download", "external"],
+)
+def fantoir_fetch_dag():
+ """
+ ### Pipeline for FANTOIR datasource - fetch
+
+ This pipeline checks if a new version of FANTOIR file is published.
+
+ If so it downloads it, extracts it and calls import DAG.
+
+ Reference: https://agora.nasqueron.org/Fantoir-datasource
+ """
+
+ @task
+ def fetch() -> dict:
+ """Fetches FANTOIR from data.economie.gouv.fr, if a new version is available."""
+ exit_code, stdout, stderr = run(
+ ["fantoir-datasource", "fetch"],
+ cwd=Variable.get("fantoir_directory"),
+ env={
+ "DATABASE_URL": "", # a value is unneeded for fetch operation
+ },
+ )
+
+ if exit_code == 12:
+ # No new version available
+ return {
+ "new_version": False,
+ "environment": {},
+ }
+
+ if exit_code != 0:
+ # Failure
+ raise CommandException("Can't fetch FANTOIR", exit_code, stderr)
+
+ return {
+ "new_version": True,
+ "environment": parse_environment(stdout),
+ }
+
+ def is_new_version_available(task_instance: TaskInstance) -> bool:
+ return task_instance.xcom_pull(task_ids="fetch", key="new_version")
+
+ check_fetch = ShortCircuitOperator(
+ task_id="check_fetch",
+ python_callable=is_new_version_available,
+ doc_md="""Determine if a new version is available from previous task.""",
+ )
+
+ # Triggered by fantoir_fetch DAG, as a new version is available.
+ call_import_dag = TriggerDagRunOperator(
+ task_id="call_import_dag",
+ trigger_dag_id="fantoir_import",
+ conf={
+ "fantoir_environment": "{{ task_instance.xcom_pull(task_ids='fetch', key='environment') }}"
+ },
+ doc_md="""Launch the workflow to import FANTOIR new version""",
+ )
+
+ @task
+ def notify(task_instance: TaskInstance):
+ """Sends a notification a new version is available."""
+
+ fantoir_file = task_instance.xcom_pull(task_ids="fetch", key="environment").get(
+ "FANTOIR_FILE", "(unknown)"
+ )
+ dag_run_id = task_instance.xcom_pull(
+ task_id="call_import_dag", key="trigger_run_id"
+ )
+ notification = {
+ "service": "Airflow",
+ "project": "Nasqueron",
+ "group": "Datasources",
+ "type": "fantoir-fetch",
+ "text": f"A new version of FANTOIR has been fetched: {fantoir_file}. Triggering import workflow: {dag_run_id}.",
+ }
+
+ response = requests.post(NOTIFICATION_URL, data=json.dumps(notification))
+ if response.status_code != 200:
+ raise WorkflowException(
+ "Can't send notification: HTTP error " + str(response.status_code)
+ )
+
+ fetch() >> check_fetch >> call_import_dag >> notify()
+
+
+fantoir_fetch_dag()
diff --git a/_pipelines/dags/fantoir_import.py b/_pipelines/dags/fantoir_import.py
new file mode 100755
--- /dev/null
+++ b/_pipelines/dags/fantoir_import.py
@@ -0,0 +1,108 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Pipeline: Datasources > FANTOIR > import
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+from datetime import datetime
+
+from airflow.decorators import dag, task
+from airflow.models import Variable
+
+from nasqueron_datasources.pipelines.commands import run
+
+
+@dag(
+ dag_id="fantoir_import",
+ schedule=None,
+ start_date=datetime(2023, 1, 1),
+ tags=["datasources", "fantoir", "postgresql", "external"],
+)
+def fantoir_import_dag():
+ """
+ ### Pipeline for FANTOIR datasource - import
+
+ This pipeline imports FANTOIR into PostgreSQL, enriches it
+ and promotes the new table as the one to use.
+
+ Enrichment is done by fetching information from:
+ - Wikidata
+
+ Reference: https://agora.nasqueron.org/Fantoir-datasource
+ """
+
+ fantoir_directory = Variable.get("fantoir_directory")
+ database_url = "to query from vault"
+
+ @task
+ def import_to_pgsql():
+ run(
+ [
+ "fantoir-datasource",
+ "import",
+ "{{ params['FANTOIR_FILE'] }}",
+ "{{ params['FANTOIR_TABLE'] }}",
+ "-c",
+ ],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def enrich_from_wikidata():
+ run(
+ ["fantoir-datasource", "wikidata"],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def promote():
+ run(
+ ["fantoir-datasource", "promote"],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def publish_to_configuration():
+ """
+ NOT IMPLEMENTED.
+
+ Publish new table name to use to etcd/consul
+ """
+ pass
+
+ @task
+ def notify():
+ """
+ NOT IMPLEMENTED.
+
+ Send notification payload to Notifications Center
+ """
+ pass
+
+ (
+ import_to_pgsql()
+ >> [
+ # Enrichment sources can run in //.
+ enrich_from_wikidata(),
+ ]
+ >> promote()
+ >> [
+ # Post-action tasks can run in // too.
+ publish_to_configuration(),
+ notify(),
+ ]
+ )
+
+
+fantoir_import_dag()
diff --git a/_pipelines/dags/nasqueron_datasources/__init__.py b/_pipelines/dags/nasqueron_datasources/__init__.py
new file mode 100644
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/__init__.py b/_pipelines/dags/nasqueron_datasources/pipelines/__init__.py
new file mode 100644
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/commands.py b/_pipelines/dags/nasqueron_datasources/pipelines/commands.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/dags/nasqueron_datasources/pipelines/commands.py
@@ -0,0 +1,57 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines :: command utilities
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Description: Helpers to handle commands in Python pipelines
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+
+import subprocess
+
+
+# -------------------------------------------------------------
+# Subprocess wrappers
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def run(command, cwd=None, env=None):
+ """
+ Runs the specified command and return exit_code, stdout, stderr.
+
+ :type env: dict|None
+ :param env: The environment variables to pass to the software
+ :type command: string|list
+ :param command: The command to run, as a string to pass to shell (to avoid) or a list [command, arg1, arg2, ...]
+ :param cwd: The working directory for the command to run
+
+ :return: (exit_code, stdout, stderr)
+ """
+ if env is None:
+ env = {}
+ shell = type(command) is str
+ process = subprocess.run(
+ command, shell=shell, cwd=cwd, env=env, capture_output=True
+ )
+
+ return process.returncode, process.stdout, process.stderr
+
+
+# -------------------------------------------------------------
+# Environment
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def parse_environment(environment_lines):
+ """
+ Parses environment as a dictionary.
+
+ This method is intended to be used with `env`, with .env files,
+ or with any command offering a similar format:
+
+ VARIABLE=value
+ """
+ return {
+ parts[0]: parts[1]
+ for parts in [line.strip().split("=") for line in environment_lines]
+ }
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/errors.py b/_pipelines/dags/nasqueron_datasources/pipelines/errors.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/dags/nasqueron_datasources/pipelines/errors.py
@@ -0,0 +1,19 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines : errors
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+
+class WorkflowException(Exception):
+ def __init__(self, message):
+ super(WorkflowException, self).__init__(message)
+
+
+class CommandException(WorkflowException):
+ def __init__(self, message, exit_code, stderr):
+ consolidated_message = "{} (exit code {}): {}".format(
+ message, exit_code, stderr
+ )
+ super(CommandException, self).__init__(consolidated_message)
diff --git a/_pipelines/requirements.txt b/_pipelines/requirements.txt
new file mode 100644
--- /dev/null
+++ b/_pipelines/requirements.txt
@@ -0,0 +1,2 @@
+apache-airflow~=2.5.0
+requests~=2.28.2
diff --git a/_pipelines/tests/files/env b/_pipelines/tests/files/env
new file mode 100644
--- /dev/null
+++ b/_pipelines/tests/files/env
@@ -0,0 +1,3 @@
+FOO=This is a sentence.
+QUUX=666
+BAR=
diff --git a/_pipelines/tests/test_commands.py b/_pipelines/tests/test_commands.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/tests/test_commands.py
@@ -0,0 +1,18 @@
+from nasqueron_datasources.pipelines import commands
+import unittest
+
+
+class TestCommands(unittest.TestCase):
+ def test_parse_environment(self):
+ expected = {
+ "FOO": "This is a sentence.",
+ "QUUX": "666", # everything is parsed as a string
+ "BAR": "", # an empty string is used instead of None for empty values
+ }
+
+ with open("files/env") as fd:
+ self.assertDictEqual(expected, commands.parse_environment(fd))
+
+
+if __name__ == "__main__":
+ unittest.main()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 23, 14:10 (18 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2258197
Default Alt Text
D2754.id8447.diff (12 KB)
Attached To
Mode
D2754: Run fantoir-datasource as Airflow pipeline
Attached
Detach File
Event Timeline
Log In to Comment