Page MenuHomeDevCentral

Run fantoir-datasource as Airflow pipeline
Needs ReviewPublic

Authored by dereckson on Jan 24 2023, 01:09.
Tags
None
Referenced Files
Unknown Object (File)
Tue, Jan 7, 16:17
Unknown Object (File)
Mon, Jan 6, 03:07
Unknown Object (File)
Mon, Jan 6, 03:07
Unknown Object (File)
Mon, Jan 6, 03:07
Unknown Object (File)
Mon, Jan 6, 03:07
Unknown Object (File)
Mon, Jan 6, 03:07
Unknown Object (File)
Mon, Jan 6, 03:07
Unknown Object (File)
Mon, Jan 6, 03:07
Subscribers
None

Details

Reviewers
dereckson
Maniphest Tasks
T1750: Import FANTOIR database
Summary

Provide a _pipelines component for Apache Airflow
to run workflows as DAGs.

The fantoir_fetch pipeline will run fantoir-datasource fetch.

When a new version is available, the fetch pipeline will run the fantoir_import
pipeline, to completethe import with import, wikidata and promote.

Pipelines are stored in the dags/ directory,
Python helper code in the dags/nasqueron_datasources/pipelines directory.

Ref T1750

Test Plan

Run workflows

Diff Detail

Repository
rDS Nasqueron Datasources
Lint
No Lint Coverage
Unit
No Test Coverage
Branch
pipelines
Build Status
Buildable 5202
Build 5483: arc lint + arc unit

Event Timeline

dereckson created this revision.
dereckson added inline comments.
_pipelines/dags/fantoir_fetch.py
21

2 lines

_pipelines/tests/test_commands.py
2

Header is missing

dereckson marked an inline comment as not done.Jan 13 2024, 14:32
dereckson added inline comments.
_pipelines/dags/fantoir_fetch.py
22

gate name is "Notification" (singular)

_pipelines/dags/fantoir_import.py
37

We should mount in containers a working directory for that purpose. Currently we have the following directories defined in /srv/airflow/nasqueron and mounted in containers:

  • dags to host this change content
  • logs for dag_processor_manager and scheduler logs
  • plugins, not currently in use

But we don't have a workspace directory that could be share among containers running this DAG code.

Plan is:

  • create /srv/airflow/{{ realm }}/workspace
  • mount it somewhere, for example /home/airflow/workspace
  • define variable fantoir_directory as /home/airflow/workspace/fantoir
  • ask DAG to create it or give instructions to Salt to create it
38

airflow connections get postgresql_fantoir now works, so probably Connection.get() should work too.

_pipelines/requirements.txt
2

We've upgraded to 2.8.0 in rDAFc6b6da7ceafb

Use fantoir_YYYMM format to suggest FANTOIR table name

Fixes T1941.

2.8.0 / add header / Notification singular / database_url

At the end of the TaskFlow DAG, function should be called, for example at the end of fantoir_fetch.py we need to add fantoir_fetch_dag().

Tested under Airflow 2.8.0, notes is an invalid argument.

airflow@4c3c353e2f0c:/opt/airflow$ python /opt/airflow/dags/fantoir_fetch.py
Traceback (most recent call last):
  File "/opt/airflow/dags/fantoir_fetch.py", line 113, in <module>
    fantoir_fetch_dag()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 3941, in factory
    f(**f_kwargs)
  File "/opt/airflow/dags/fantoir_fetch.py", line 77, in fantoir_fetch_dag
    call_import_dag = TriggerDagRunOperator(
                      ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/trigger_dagrun.py", line 122, in __init__
    super().__init__(**kwargs)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 798, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to TriggerDagRunOperator (task_id: call_import_dag). Invalid arguments were:
**kwargs: {'notes': 'Triggered by fantoir_fetch DAG, as a new version is available.'}

TaskFlow DAGs are defined by a function, but that function needs to be called.

As such, there are valid Python executables, hence the chmod 755.

With that, there are correctly detected by Airflow on restart.

Create table if it doesn't exist

  • Notification gate is singular
  • Get FANTOIR connection from secret back-end
  • Inject PATH in environment so we can find fantoir-datasource in /usr/local/bin
  • Decode bytes and split lines into a list for stdout, stderr

Rebase. Bump Airflow version.

dereckson marked an inline comment as not done.Jan 14 2024, 22:08
dereckson planned changes to this revision.EditedJan 14 2024, 22:16

Currently, the fetch pipeline fails at the notify step:

dag_run_id = task_instance.xcom_pull(
    task_id="call_import_dag", key="trigger_run_id"
)

Probably task_ids as the code just above works, and XCOM variable is resolved according sentry.

From https://sentry.nasqueron.org/organizations/airflow/issues/73:

TypeError
TaskInstance.xcom_pull() got an unexpected keyword argument 'task_id'

task_ids for task_instance.xcom_pull

Notification successfully reached the Notifications Center. Here an extract of the event through Sentry discover:

image.png (738×1 px, 111 KB)

It's been processed by Wearg on IRC and ignored, as group is unknown:

22:37:50 <Wearg> [DEBUG] Message for unknown group: Nasqueron Datasources