Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions content/en/data_observability/jobs_monitoring/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@

### Link your dbt jobs with Airflow tasks

<div class="alert alert-info">On Airflow 2.9.2 with <code>apache-airflow-providers-openlineage</code> 2.2.0 - the latest provider enabled for this Airflow version - the <code>lineage_root_*</code> macros required for root-parent linking are not available. To use them anyway, see <a href="#backport-openlineage-lineage-macros-for-older-provider-versions">Backport OpenLineage lineage macros for older provider versions</a>.</div>

Check notice on line 363 in content/en/data_observability/jobs_monitoring/airflow.md

View workflow job for this annotation

GitHub Actions / vale

Datadog.sentencelength

Suggestion: Try to keep your sentence length to 25 words or fewer.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this note needed here? This section does not contain any references to the lineage_root_* macro.


You can monitor your dbt jobs that are running in Airflow by connecting the dbt telemetry with respective Airflow tasks, using [OpenLineage dbt integration][6].

To see the link between Airflow tasks and dbt jobs, follow those steps:
Expand Down Expand Up @@ -394,6 +396,8 @@

### Link your Spark jobs with Airflow tasks

<div class="alert alert-info">The <code>lineage_root_*</code> macros require <code>apache-airflow-providers-openlineage</code> 2.3.0 or later. On older provider versions (for example, Airflow 2.9.2 with provider 2.2.0), see <a href="#backport-openlineage-lineage-macros-for-older-provider-versions">Backport OpenLineage lineage macros for older provider versions</a>.</div>

OpenLineage integration can automatically inject Airflow's parent job information (namespace, job name, run id) into Spark application properties. This creates a parent-child relationship between Airflow tasks and Spark jobs, enabling you to troubleshoot both systems in one place.

**Note**: This feature requires `apache-airflow-providers-openlineage` version 2.1.0 or later (supported from Airflow 2.9+).
Expand All @@ -410,6 +414,141 @@

This automatically injects parent job properties for all supported Spark Operators. To disable for specific operators, set `openlineage_inject_parent_job_info=False` on the operator.

#### Manually inject parent job information for unsupported operators

For operators that do not support automatic injection (for example, a `BashOperator` running the AWS CLI to start an Amazon EMR Serverless job), use the [OpenLineage Airflow lineage macros][4] to pass parent and root-parent identifiers as Spark configuration properties:

Check notice on line 419 in content/en/data_observability/jobs_monitoring/airflow.md

View workflow job for this annotation

GitHub Actions / vale

Datadog.sentencelength

Suggestion: Try to keep your sentence length to 25 words or fewer.

```python
from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
dag_id="emr_serverless_with_openlineage_parent",
start_date=datetime(2026, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:

submit = BashOperator(
task_id="start_emr_serverless_job",
bash_command=r"""
set -euo pipefail

aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <EXECUTION_ROLE_ARN> \
--name <JOB_NAME> \
--region <AWS_REGION> \
--mode STREAMING \
--job-driver '{"sparkSubmit": {
"entryPoint": "<JAR_URL>",
"entryPointArguments": ["--config-key", "<CONFIG_KEY>", "--stage", "<STAGE>"],
"sparkSubmitParameters": "\
--conf spark.app.name=<APP_NAME> \
--conf spark.openlineage.parentJobNamespace={{ lineage_job_namespace() }} \
--conf spark.openlineage.parentJobName={{ lineage_job_name(task_instance) }} \
--conf spark.openlineage.parentRunId={{ lineage_run_id(task_instance) }} \
--conf spark.openlineage.rootParentJobNamespace={{ lineage_root_job_namespace(task_instance) }} \
--conf spark.openlineage.rootParentJobName={{ lineage_root_job_name(task_instance) }} \
--conf spark.openlineage.rootParentRunId={{ lineage_root_run_id(task_instance) }}\
"
}}'
""",
)
```

### Backport OpenLineage lineage macros for older provider versions

The `lineage_root_job_namespace`, `lineage_root_job_name`, and `lineage_root_run_id` macros that emit the outermost DAG identifiers for root-parent linking were added in `apache-airflow-providers-openlineage` 2.3.0. If you cannot upgrade the provider (for example, Amazon MWAA on Airflow 2.9.2 pins provider 2.2.0), define the missing macros as `user_defined_macros` on the DAG:

Check notice on line 464 in content/en/data_observability/jobs_monitoring/airflow.md

View workflow job for this annotation

GitHub Actions / vale

Datadog.sentencelength

Suggestion: Try to keep your sentence length to 25 words or fewer.

```python
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.openlineage import conf as ol_conf
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter


def _logical_date(ti):
return getattr(ti, "logical_date", None) or ti.execution_date


def _clear_number(ti) -> int:
return ti.get_dagrun().clear_number


def _root_from_conf(ti, key):
ol = (ti.get_dagrun().conf or {}).get("openlineage") or {}
return ol.get(key) or None


def lineage_job_namespace():
return ol_conf.namespace()


def lineage_job_name(ti):
return f"{ti.dag_id}.{ti.task_id}"


def lineage_run_id(ti):
return OpenLineageAdapter.build_task_instance_run_id(
dag_id=ti.dag_id,
task_id=ti.task_id,
try_number=ti.try_number,
logical_date=_logical_date(ti),
map_index=ti.map_index,
)


def lineage_root_job_namespace(ti):
return _root_from_conf(ti, "rootParentJobNamespace") or ol_conf.namespace()


def lineage_root_job_name(ti):
return _root_from_conf(ti, "rootParentJobName") or ti.dag_id


def lineage_root_run_id(ti):
forwarded = _root_from_conf(ti, "rootParentRunId")
if forwarded:
return forwarded
return OpenLineageAdapter.build_dag_run_id(
dag_id=ti.dag_id,
logical_date=_logical_date(ti),
clear_number=_clear_number(ti),
)


OL_MACROS = {
"lineage_job_namespace": lineage_job_namespace,
"lineage_job_name": lineage_job_name,
"lineage_run_id": lineage_run_id,
"lineage_root_job_namespace": lineage_root_job_namespace,
"lineage_root_job_name": lineage_root_job_name,
"lineage_root_run_id": lineage_root_run_id,
}


with DAG(
dag_id="<YOUR_DAG_ID>",
start_date=datetime(2026, 1, 1),
schedule_interval=None,
catchup=False,
user_defined_macros=OL_MACROS,
) as dag:
submit = BashOperator(
task_id="<YOUR_TASK_ID>",
bash_command="...",
)
```

With `user_defined_macros` set on the DAG, the `{{ lineage_*() }}` and `{{ lineage_root_*() }}` calls in your task templates resolve to values that match the built-in macros shipped with provider 2.3.0+, so downstream Spark or dbt jobs can link to the Airflow root parent in Datadog.

Check notice on line 550 in content/en/data_observability/jobs_monitoring/airflow.md

View workflow job for this annotation

GitHub Actions / vale

Datadog.sentencelength

Suggestion: Try to keep your sentence length to 25 words or fewer.


## Further Reading

Expand Down
Loading