Page MenuHomeDevCentral

D2754.id8458.diff
No OneTemporary

D2754.id8458.diff

diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,10 @@
+# Rust
/target
+Cargo.lock
+
+# Python
+__pycache__
+
+# Data
/FANTOIR*
!/fantoir-datasource
-Cargo.lock
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,41 @@
+# -------------------------------------------------------------
+# Nasqueron - Docker image for Nasqueron Datasources
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+# -------------------------------------------------------------
+# Builder phase
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+FROM debian:bookworm AS builder
+
+RUN apt update && \
+ apt install -y git curl build-essential pkg-config \
+ libpq-dev libssl-dev && \
+ rm -r /var/lib/apt/lists/* && \
+ mkdir -p /opt/datasources && \
+ curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y
+
+WORKDIR /opt/datasources
+ADD . ./
+
+RUN make all
+
+# -------------------------------------------------------------
+# Release phase
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+FROM debian:bookworm
+MAINTAINER Sébastien Santoro aka Dereckson <dereckson+nasqueron-docker@espace-win.org>
+
+RUN apt update && \
+ apt install -y unzip libpq5 ca-certificates && \
+ rm -r /var/lib/apt/lists/*
+
+COPY --from=builder \
+ /opt/datasources/target/release/fantoir-datasource \
+ /opt/datasources/target/release/language-subtag-registry-datasource \
+ /opt/datasources/target/release/rfc-datasource \
+ /usr/local/bin/
diff --git a/Makefile b/Makefile
new file mode 100644
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,51 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+PREFIX=/usr/local
+
+CARGO=${HOME}/.cargo/bin/cargo
+INSTALL=install
+RM=rm -rf
+
+# -------------------------------------------------------------
+# Main targets
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+all: build
+
+build: target/release
+
+test:
+ RUST_TEST_THREADS=1 ${CARGO} test
+
+clean:
+ ${RM} target
+
+clean-all:
+ ${CARGO} clean
+
+install: ${PREFIX}/bin/fantoir-datasource ${PREFIX}/bin/language-subtag-registry-datasource ${PREFIX}/bin/rfc-datasource
+
+# -------------------------------------------------------------
+# Build
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+target/release:
+ ${CARGO} build --release
+
+# -------------------------------------------------------------
+# Install
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+${PREFIX}/bin/fantoir-datasource:
+ ${INSTALL} target/release/fantoir-datasource ${PREFIX}/bin/
+
+${PREFIX}/bin/language-subtag-registry-datasource:
+ ${INSTALL} target/release/language-subtag-registry-datasource ${PREFIX}/bin/
+
+${PREFIX}/bin/rfc-datasource:
+ ${INSTALL} target/release/rfc-datasource ${PREFIX}/bin/
diff --git a/_pipelines/README.md b/_pipelines/README.md
new file mode 100644
--- /dev/null
+++ b/_pipelines/README.md
@@ -0,0 +1,14 @@
+## Nasqueron Datasources :: pipelines
+
+The dags/ directory contains DAGs pipelines as code for Apache Airflow.
+
+Those pipelines can be used:
+
+ - at Nasqueron, on our Airflow instance
+ - elsewhere, as a sample documentation how to use our datasources
+ components and how to glue the components together
+
+The nasqueron_datasources module is published to the dags folder too,
+so is available from the different DAGs. It contains helper methods.
+
+Unit tests are available in tests/ folder.
diff --git a/_pipelines/dags/fantoir_fetch.py b/_pipelines/dags/fantoir_fetch.py
new file mode 100755
--- /dev/null
+++ b/_pipelines/dags/fantoir_fetch.py
@@ -0,0 +1,115 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Pipeline: Datasources > FANTOIR > fetch
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+from datetime import datetime
+import json
+import requests
+
+from airflow.decorators import dag, task
+from airflow.models import Variable, TaskInstance
+from airflow.operators.python import ShortCircuitOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+
+from nasqueron_datasources.pipelines.commands import run, parse_environment
+from nasqueron_datasources.pipelines.errors import CommandException, WorkflowException
+
+
+NOTIFICATION_URL = "https://notifications.nasqueron.org/gate/Notification/Nasqueron"
+
+
+@dag(
+ dag_id="fantoir_fetch",
+ schedule=None,
+ start_date=datetime(2023, 1, 1),
+ tags=["datasources", "fantoir", "download", "external"],
+)
+def fantoir_fetch_dag():
+ """
+ ### Pipeline for FANTOIR datasource - fetch
+
+ This pipeline checks if a new version of FANTOIR file is published.
+
+ If so it downloads it, extracts it and calls import DAG.
+
+ Reference: https://agora.nasqueron.org/Fantoir-datasource
+ """
+
+ @task
+ def fetch() -> dict:
+ """Fetches FANTOIR from data.economie.gouv.fr, if a new version is available."""
+ exit_code, stdout, stderr = run(
+ ["fantoir-datasource", "fetch"],
+ cwd=Variable.get("fantoir_directory"),
+ env={
+ "DATABASE_URL": "", # a value is unneeded for fetch operation
+ },
+ )
+
+ if exit_code == 12:
+ # No new version available
+ return {
+ "new_version": False,
+ "environment": {},
+ }
+
+ if exit_code != 0:
+ # Failure
+ raise CommandException("Can't fetch FANTOIR", exit_code, stderr)
+
+ return {
+ "new_version": True,
+ "environment": parse_environment(stdout),
+ }
+
+ def is_new_version_available(task_instance: TaskInstance) -> bool:
+ return task_instance.xcom_pull(task_ids="fetch", key="new_version")
+
+ check_fetch = ShortCircuitOperator(
+ task_id="check_fetch",
+ python_callable=is_new_version_available,
+ doc_md="""Determine if a new version is available from previous task.""",
+ )
+
+ # Triggered by fantoir_fetch DAG, as a new version is available.
+ call_import_dag = TriggerDagRunOperator(
+ task_id="call_import_dag",
+ trigger_dag_id="fantoir_import",
+ conf={
+ "fantoir_environment": "{{ task_instance.xcom_pull(task_ids='fetch', key='environment') }}"
+ },
+ doc_md="""Launch the workflow to import FANTOIR new version""",
+ )
+
+ @task
+ def notify(task_instance: TaskInstance):
+ """Sends a notification a new version is available."""
+
+ fantoir_file = task_instance.xcom_pull(task_ids="fetch", key="environment").get(
+ "FANTOIR_FILE", "(unknown)"
+ )
+ dag_run_id = task_instance.xcom_pull(
+ task_id="call_import_dag", key="trigger_run_id"
+ )
+ notification = {
+ "service": "Airflow",
+ "project": "Nasqueron",
+ "group": "Datasources",
+ "type": "fantoir-fetch",
+ "text": f"A new version of FANTOIR has been fetched: {fantoir_file}. Triggering import workflow: {dag_run_id}.",
+ }
+
+ response = requests.post(NOTIFICATION_URL, data=json.dumps(notification))
+ if response.status_code != 200:
+ raise WorkflowException(
+ "Can't send notification: HTTP error " + str(response.status_code)
+ )
+
+ fetch() >> check_fetch >> call_import_dag >> notify()
+
+
+fantoir_fetch_dag()
diff --git a/_pipelines/dags/fantoir_import.py b/_pipelines/dags/fantoir_import.py
new file mode 100755
--- /dev/null
+++ b/_pipelines/dags/fantoir_import.py
@@ -0,0 +1,108 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Pipeline: Datasources > FANTOIR > import
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+from datetime import datetime
+
+from airflow.decorators import dag, task
+from airflow.models import Connection, Variable
+
+from nasqueron_datasources.pipelines.commands import run
+
+
+@dag(
+ dag_id="fantoir_import",
+ schedule=None,
+ start_date=datetime(2023, 1, 1),
+ tags=["datasources", "fantoir", "postgresql", "external"],
+)
+def fantoir_import_dag():
+ """
+ ### Pipeline for FANTOIR datasource - import
+
+ This pipeline imports FANTOIR into PostgreSQL, enriches it
+ and promotes the new table as the one to use.
+
+ Enrichment is done by fetching information from:
+ - Wikidata
+
+ Reference: https://agora.nasqueron.org/Fantoir-datasource
+ """
+
+ fantoir_directory = Variable.get("fantoir_directory")
+ database_url = Connection.get_connection_from_secrets("postgresql_fantoir").get_uri()
+
+ @task
+ def import_to_pgsql():
+ run(
+ [
+ "fantoir-datasource",
+ "import",
+ "{{ params['FANTOIR_FILE'] }}",
+ "{{ params['FANTOIR_TABLE'] }}",
+ "-c",
+ ],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def enrich_from_wikidata():
+ run(
+ ["fantoir-datasource", "wikidata"],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def promote():
+ run(
+ ["fantoir-datasource", "promote"],
+ cwd=fantoir_directory,
+ env={
+ "DATABASE_URL": database_url,
+ },
+ )
+
+ @task
+ def publish_to_configuration():
+ """
+ NOT IMPLEMENTED.
+
+ Publish new table name to use to etcd/consul
+ """
+ pass
+
+ @task
+ def notify():
+ """
+ NOT IMPLEMENTED.
+
+ Send notification payload to Notifications Center
+ """
+ pass
+
+ (
+ import_to_pgsql()
+ >> [
+ # Enrichment sources can run in //.
+ enrich_from_wikidata(),
+ ]
+ >> promote()
+ >> [
+ # Post-action tasks can run in // too.
+ publish_to_configuration(),
+ notify(),
+ ]
+ )
+
+
+fantoir_import_dag()
diff --git a/_pipelines/dags/nasqueron_datasources/__init__.py b/_pipelines/dags/nasqueron_datasources/__init__.py
new file mode 100644
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/__init__.py b/_pipelines/dags/nasqueron_datasources/pipelines/__init__.py
new file mode 100644
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/commands.py b/_pipelines/dags/nasqueron_datasources/pipelines/commands.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/dags/nasqueron_datasources/pipelines/commands.py
@@ -0,0 +1,65 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines :: command utilities
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# Description: Helpers to handle commands in Python pipelines
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+
+import os
+import subprocess
+
+
+# -------------------------------------------------------------
+# Subprocess wrappers
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def run(command, cwd=None, env=None):
+ """
+ Runs the specified command and return exit_code, stdout, stderr.
+
+ :type env: dict|None
+ :param env: The environment variables to pass to the software
+ :type command: string|list
+ :param command: The command to run, as a string to pass to shell (to avoid) or a list [command, arg1, arg2, ...]
+ :param cwd: The working directory for the command to run
+
+ :return: (exit_code, stdout, stderr)
+ """
+ if env is None:
+ env = {}
+
+ if "path" not in env:
+ env["PATH"] = os.environ["PATH"]
+
+ shell = type(command) is str
+ process = subprocess.run(
+ command, shell=shell, cwd=cwd, env=env, capture_output=True
+ )
+
+ stdout = process.stdout.decode("utf-8").split("\n")
+ stderr = process.stderr.decode("utf-8").split("\n")
+
+ return process.returncode, stdout, stderr
+
+
+# -------------------------------------------------------------
+# Environment
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
+
+def parse_environment(environment_lines):
+ """
+ Parses environment as a dictionary.
+
+ This method is intended to be used with `env`, with .env files,
+ or with any command offering a similar format:
+
+ VARIABLE=value
+ """
+ return {
+ parts[0]: parts[1]
+ for parts in [line.strip().split("=") for line in environment_lines]
+ }
diff --git a/_pipelines/dags/nasqueron_datasources/pipelines/errors.py b/_pipelines/dags/nasqueron_datasources/pipelines/errors.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/dags/nasqueron_datasources/pipelines/errors.py
@@ -0,0 +1,19 @@
+# -------------------------------------------------------------
+# Nasqueron Datasources :: pipelines : errors
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Project: Nasqueron
+# License: BSD-2-Clause
+# -------------------------------------------------------------
+
+
+class WorkflowException(Exception):
+ def __init__(self, message):
+ super(WorkflowException, self).__init__(message)
+
+
+class CommandException(WorkflowException):
+ def __init__(self, message, exit_code, stderr):
+ consolidated_message = "{} (exit code {}): {}".format(
+ message, exit_code, stderr
+ )
+ super(CommandException, self).__init__(consolidated_message)
diff --git a/_pipelines/requirements.txt b/_pipelines/requirements.txt
new file mode 100644
--- /dev/null
+++ b/_pipelines/requirements.txt
@@ -0,0 +1,2 @@
+apache-airflow~=2.8.0
+requests~=2.28.2
diff --git a/_pipelines/tests/files/env b/_pipelines/tests/files/env
new file mode 100644
--- /dev/null
+++ b/_pipelines/tests/files/env
@@ -0,0 +1,3 @@
+FOO=This is a sentence.
+QUUX=666
+BAR=
diff --git a/_pipelines/tests/test_commands.py b/_pipelines/tests/test_commands.py
new file mode 100644
--- /dev/null
+++ b/_pipelines/tests/test_commands.py
@@ -0,0 +1,18 @@
+from nasqueron_datasources.pipelines import commands
+import unittest
+
+
+class TestCommands(unittest.TestCase):
+ def test_parse_environment(self):
+ expected = {
+ "FOO": "This is a sentence.",
+ "QUUX": "666", # everything is parsed as a string
+ "BAR": "", # an empty string is used instead of None for empty values
+ }
+
+ with open("files/env") as fd:
+ self.assertDictEqual(expected, commands.parse_environment(fd))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/fantoir-datasource/src/commands/fetch/fantoir_file.rs b/fantoir-datasource/src/commands/fetch/fantoir_file.rs
--- a/fantoir-datasource/src/commands/fetch/fantoir_file.rs
+++ b/fantoir-datasource/src/commands/fetch/fantoir_file.rs
@@ -34,8 +34,8 @@
pub fn get_file_candidates(&self) -> Vec<String> {
let previous_month = self.date - Months::new(1);
vec![
- format!("FANTOIR{}{}", previous_month.month(), previous_month.year() - 2000),
- format!("FANTOIR{}{}", self.date.month(), self.date.year() - 2000),
+ format!("FANTOIR{:02}{:02}", previous_month.month(), previous_month.year() - 2000),
+ format!("FANTOIR{:02}{:02}", self.date.month(), self.date.year() - 2000),
]
}
diff --git a/fantoir-datasource/src/commands/fetch/mod.rs b/fantoir-datasource/src/commands/fetch/mod.rs
--- a/fantoir-datasource/src/commands/fetch/mod.rs
+++ b/fantoir-datasource/src/commands/fetch/mod.rs
@@ -66,7 +66,7 @@
let month: i8 = filename[7..=8].parse().unwrap();
let year = 2000 + filename[9..=10].parse::<i32>().unwrap();
- format!("fantoir_{}{}", year, month)
+ format!("fantoir_{}{:02}", year, month)
}
/// Determines a temporary location where to save the FANTOIR file ZIP archive

File Metadata

Mime Type
text/plain
Expires
Tue, Apr 22, 04:24 (18 h, 57 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2594239
Default Alt Text
D2754.id8458.diff (16 KB)

Event Timeline