Page MenuHomeDevCentral

D2754.id8447.diff
No OneTemporary

D2754.id8447.diff

diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,10 @@
+# Rust
/target
+Cargo.lock
+
+# Python
+__pycache__
+
+# Data
/FANTOIR*
!/fantoir-datasource
-Cargo.lock
diff --git a/_pipelines/README.md b/_pipelines/README.md
new file mode 100644
--- /dev/null
+++ b/_pipelines/README.md
@@ -0,0 +1,14 @@
+## Nasqueron Datasources :: pipelines
+
+The dags/ directory contains DAGs pipelines as code for Apache Airflow.
+
+Those pipelines can be used:
+
+ - at Nasqueron, on our Airflow instance
+ - elsewhere, as a sample documentation how to use our datasources
+ components and how to glue the components together
+
+The nasqueron_datasources module is published to the dags folder too,
+so is available from the different DAGs. It contains helper methods.
+
+Unit tests are available in tests/ folder.
diff --git a/_pipelines/dags/fantoir_fetch.py b/_pipelines/dags/fantoir_fetch.py
new file mode 100755
--- /dev/null
+++ b/_pipelines/dags/fantoir_fetch.py
@@ -0,0 +1,115 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Pipeline: Datasources > FANTOIR > fetch
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+from datetime import datetime
+import json
+import requests
+
+from airflow.decorators import dag, task
+from airflow.models import Variable, TaskInstance
+from airflow.operators.python import ShortCircuitOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+
+from nasqueron_datasources.pipelines.commands import run, parse_environment
+from nasqueron_datasources.pipelines.errors import CommandException, WorkflowException
+
+
+NOTIFICATION_URL = "https://notifications.nasqueron.org/gate/Notifications/Nasqueron"
+
+
+@dag(
+ dag_id="fantoir_fetch",
+ schedule=None,
+ start_date=datetime(2023, 1, 1),
+ tags=["datasources", "fantoir", "download", "external"],
+)
+def fantoir_fetch_dag():
+ """
+ ### Pipeline for FANTOIR datasource - fetch
+
+ This pipeline checks if a new version of FANTOIR file is published.
+
+ If so it downloads it, extracts it and calls import DAG.
+
+ Reference: https://agora.nasqueron.org/Fantoir-datasource
+ """
+
+ @task
+ def fetch() -> dict:
+ """Fetches FANTOIR from data.economie.gouv.fr, if a new version is available."""
+ exit_code, stdout, stderr = run(
+ ["fantoir-datasource", "fetch"],
+ cwd=Variable.get("fantoir_directory"),
+ env={
+ "DATABASE_URL": "", # a value is unneeded for fetch operation
+ },
+ )
+
+ if exit_code == 12:
+ # No new version available
+ return {
+ "new_version": False,
+ "environment": {},
+ }
+
+ if exit_code != 0:
+ # Failure
+ raise CommandException("Can't fetch FANTOIR", exit_code, stderr)
+
+ return {
+ "new_version": True,
+ "environment": parse_environment(stdout),
+ }
+
+ def is_new_version_available(task_instance: TaskInstance) -> bool:
+ return task_instance.xcom_pull(task_ids="fetch", key="new_version")
+
+ check_fetch = ShortCircuitOperator(
+ task_id="check_fetch",
+ python_callable=is_new_version_available,
+ doc_md="""Determine if a new version is available from previous task.""",
+ )
+
+ # Triggered by fantoir_fetch DAG, as a new version is available.
+ call_import_dag = TriggerDagRunOperator(
+ task_id="call_import_dag",
+ trigger_dag_id="fantoir_import",
+ conf={
+ "fantoir_environment": "{{ task_instance.xcom_pull(task_ids='fetch', key='environment') }}"
+ },
+ doc_md="""Launch the workflow to import FANTOIR new version""",
+ )
+
+ @task
+ def notify(task_instance: TaskInstance):
+ """Sends a notification a new version is available."""
+
+ fantoir_file = task_instance.xcom_pull(task_ids="fetch", key="environment").get(
+ "FANTOIR_FILE", "(unknown)"
+ )
+ dag_run_id = task_instance.xcom_pull(
+ task_id="call_import_dag", key="trigger_run_id"
+ )
+ notification = {
+ "service": "Airflow",
+ "project": "Nasqueron",
+ "group": "Datasources",
+ "type": "fantoir-fetch",
+ "text": f"A new version of FANTOIR has been fetched: {fantoir_file}. Triggering import workflow: {dag_run_id}.",
+ }
+
+ response = requests.post(NOTIFICATION_URL, data=json.dumps(notification))
+ if response.status_code != 200:
+ raise WorkflowException(
+ "Can't send notification: HTTP error " + str(response.status_code)
+ )
+
+ fetch() >> check_fetch >> call_import_dag >> notify()
+
+
+fantoir_fetch_dag()
diff --git a/_pipelines/dags/fantoir_import.py b/_pipelines/dags/fantoir_import.py
new file mode 100755
--- /dev/null
+++ b/_pipelines/dags/fantoir_import.py
@@ -0,0 +1,108 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Pipeline: Datasources > FANTOIR > import
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+from datetime import datetime
+
+from airflow.decorators import dag, task
+from airflow.models import Variable
+
+from nasqueron_datasources.pipelines.commands import run
+
+
+@dag(
+ dag_id="fantoir_import",
+ schedule=None,
+ start_date=datetime(2023, 1, 1),
+ tags=["datasources", "fantoir", "postgresql", "external"],
+)
+def fantoir_import_dag():
+ """
+ ### Pipeline for FANTOIR datasource - import
+
+ This pipeline imports FANTOIR into PostgreSQL, enriches it
+ and promotes the new table as the one to use.
+
+ Enrichment is done by fetching information from:
+ - Wikidata
+
+ Reference: https://agora.nasqueron.org/Fantoir-datasource
+ """
+
+ fantoir_directory = Variable.get("fantoir_directory")
+ database_url = "to query from vault"
+
+ @task
+ def import_to_pgsql():
+ run(
+ [
+ "fantoir-datasource",
+ "import",
+ "{{ params['FANTOIR_FILE'] }}",
+ "{{ params['FANTOIR_TABLE'] }}",
+ "-c",
+ ],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def enrich_from_wikidata():
+ run(
+ ["fantoir-datasource", "wikidata"],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def promote():
+ run(
+ ["fantoir-datasource", "promote"],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def publish_to_configuration():
+ """
+ NOT IMPLEMENTED.
+
+ Publish new table name to use to etcd/consul
+ """
+ pass
+
+ @task
+ def notify():
+ """
+ NOT IMPLEMENTED.
+
+ Send notification payload to Notifications Center
+ """
+ pass
+
+ (
+ import_to_pgsql()
+ >> [
+ # Enrichment sources can run in //.
+ enrich_from_wikidata(),
+ ]
+ >> promote()
+ >> [
+ # Post-action tasks can run in // too.
+ publish_to_configuration(),
+ notify(),
+ ]
+ )
+
+
+fantoir_import_dag()
diff --git a/_pipelines/dags/nasqueron_datasources/__init__.py b/_pipelines/dags/nasqueron_datasources/__init__.py
new file mode 100644
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/__init__.py b/_pipelines/dags/nasqueron_datasources/pipelines/__init__.py
new file mode 100644
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/commands.py b/_pipelines/dags/nasqueron_datasources/pipelines/commands.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/dags/nasqueron_datasources/pipelines/commands.py
@@ -0,0 +1,57 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines :: command utilities
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Description: Helpers to handle commands in Python pipelines
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+
+import subprocess
+
+
+# -------------------------------------------------------------
+# Subprocess wrappers
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def run(command, cwd=None, env=None):
+ """
+ Runs the specified command and return exit_code, stdout, stderr.
+
+ :type env: dict|None
+ :param env: The environment variables to pass to the software
+ :type command: string|list
+ :param command: The command to run, as a string to pass to shell (to avoid) or a list [command, arg1, arg2, ...]
+ :param cwd: The working directory for the command to run
+
+ :return: (exit_code, stdout, stderr)
+ """
+ if env is None:
+ env = {}
+ shell = type(command) is str
+ process = subprocess.run(
+ command, shell=shell, cwd=cwd, env=env, capture_output=True
+ )
+
+ return process.returncode, process.stdout, process.stderr
+
+
+# -------------------------------------------------------------
+# Environment
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def parse_environment(environment_lines):
+ """
+ Parses environment as a dictionary.
+
+ This method is intended to be used with `env`, with .env files,
+ or with any command offering a similar format:
+
+ VARIABLE=value
+ """
+ return {
+ parts[0]: parts[1]
+ for parts in [line.strip().split("=") for line in environment_lines]
+ }
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/errors.py b/_pipelines/dags/nasqueron_datasources/pipelines/errors.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/dags/nasqueron_datasources/pipelines/errors.py
@@ -0,0 +1,19 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines : errors
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+
+class WorkflowException(Exception):
+ def __init__(self, message):
+ super(WorkflowException, self).__init__(message)
+
+
+class CommandException(WorkflowException):
+ def __init__(self, message, exit_code, stderr):
+ consolidated_message = "{} (exit code {}): {}".format(
+ message, exit_code, stderr
+ )
+ super(CommandException, self).__init__(consolidated_message)
diff --git a/_pipelines/requirements.txt b/_pipelines/requirements.txt
new file mode 100644
--- /dev/null
+++ b/_pipelines/requirements.txt
@@ -0,0 +1,2 @@
+apache-airflow~=2.5.0
+requests~=2.28.2
diff --git a/_pipelines/tests/files/env b/_pipelines/tests/files/env
new file mode 100644
--- /dev/null
+++ b/_pipelines/tests/files/env
@@ -0,0 +1,3 @@
+FOO=This is a sentence.
+QUUX=666
+BAR=
diff --git a/_pipelines/tests/test_commands.py b/_pipelines/tests/test_commands.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/tests/test_commands.py
@@ -0,0 +1,18 @@
+from nasqueron_datasources.pipelines import commands
+import unittest
+
+
+class TestCommands(unittest.TestCase):
+ def test_parse_environment(self):
+ expected = {
+ "FOO": "This is a sentence.",
+ "QUUX": "666", # everything is parsed as a string
+ "BAR": "", # an empty string is used instead of None for empty values
+ }
+
+ with open("files/env") as fd:
+ self.assertDictEqual(expected, commands.parse_environment(fd))
+
+
+if __name__ == "__main__":
+ unittest.main()

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 14:10 (18 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2258197
Default Alt Text
D2754.id8447.diff (12 KB)

Event Timeline