This guide assumes familiarity with Python programming and data pipeline concepts. We'll progressively build from core concepts to advanced implementation patterns.
Table of Contents
- Introduction to Apache Airflow
- Core Concepts
- Setting Up Your Airflow Environment
- DAG Fundamentals
- Operators in Depth
- XComs: Passing Data Between Tasks
- Sensors and Hooks
- Dynamic DAG Generation
- Testing and Debugging
- Performance Optimization
- Security Best Practices
- Production Deployment
- Advanced Patterns
- Troubleshooting Common Issues
- Resources and Further Learning
Introduction to Apache Airflow
Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows. Developed at Airbnb in 2014 and later contributed to the Apache Software Foundation, Airflow has become the de facto standard for orchestrating complex computational workflows and data processing pipelines.
Why Airflow?
Traditional scheduling tools like cron lack visibility, dependency management, and error handling capabilities required for complex data pipelines. Airflow addresses these limitations by providing:
- Programmatic workflow creation - Define workflows as Python code
- Visualization of pipeline dependencies - DAG view shows relationships between tasks
- Extensible architecture - Easy integration with various systems via operators and hooks
- Scalable execution model - Distribute task execution across workers
- Rich UI - Monitor, troubleshoot, and manage workflows
Airflow excels in scenarios requiring complex dependencies between tasks and integration with multiple systems. For simpler workflows, lighter solutions might be more appropriate.
Airflow’s Architecture
Airflow operates with several key components:
- Webserver: Provides the UI for monitoring and managing DAGs and tasks
- Scheduler: Orchestrates task execution based on dependencies and schedules
- Executor: Determines how tasks get run (locally, on remote workers, etc.)
- Metadata Database: Stores DAG and task execution metadata
- DAG Directory: Contains Python files defining workflow DAGs
The architecture follows a message-passing paradigm where the scheduler identifies tasks ready for execution and passes them to the executor, which distributes them to workers.
Core Concepts
To effectively work with Airflow, you need to understand several fundamental concepts:
DAGs (Directed Acyclic Graphs)
A DAG represents a workflow as a collection of tasks with dependencies between them. The “directed” aspect means that data flows in one direction, while “acyclic” means there are no cycles (a task cannot depend on itself directly or indirectly).
DAGs in Airflow are defined in Python, allowing for dynamic generation and complex logic in workflow construction.
Tasks
Tasks are the units of execution in Airflow. Each task represents a piece of work to be done and is implemented using an Operator. Tasks are arranged within a DAG to form the workflow.
Operators
Operators determine what gets done by a task. Airflow provides many built-in operators:
- BashOperator: Executes bash commands
- PythonOperator: Calls Python functions
- SQLOperator variants: Execute SQL queries
- Sensor Operators: Wait for conditions to be met
- Transfer Operators: Move data between systems
Task Instances
A task instance represents a specific run of a task at a point in time. It has its own state (running, success, failed, etc.) and execution context.
Task Dependencies
Dependencies define the order in which tasks should run. In Airflow, you can use multiple methods to establish dependencies:
# Method 1: Using the >> operator
task1 >> task2 >> task3
# Method 2: Using the << operator
task3 << task2 << task1
# Method 3: Using the set_upstream/set_downstream methods
task1.set_downstream(task2)
task2.set_downstream(task3)
Execution Date and Data Intervals
Each DAG run is associated with a logical execution date (in Airflow 2.2+, this concept was expanded to data intervals). This doesn’t necessarily represent when the DAG runs but rather what data period the run is processing.
Setting Up Your Airflow Environment
There are several ways to set up Airflow, depending on your needs:
Method 1: Using pip (Development Environment)
# Create a virtual environment
python -m venv airflow-env
source airflow-env/bin/activate # On Windows: airflow-env\Scripts\activate
# Install Airflow with extras
pip install "apache-airflow[postgres,s3,http]==2.7.1"
# Set the Airflow home directory
export AIRFLOW_HOME=~/airflow
# Initialize the database
airflow db init
# Create a user
airflow users create \
--username admin \
--password admin \
--firstname First \
--lastname Last \
--role Admin \
--email admin@example.com
# Start the webserver and scheduler
airflow webserver --port 8080
airflow scheduler
Method 2: Using Docker Compose (Recommended for Development)
Create a docker-compose.yml
file:
version: '3'
services:
postgres:
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
webserver:
image: apache/airflow:2.7.1
depends_on:
- postgres
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY:-}
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
- AIRFLOW__CORE__LOAD_EXAMPLES=false
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
scheduler:
image: apache/airflow:2.7.1
depends_on:
- postgres
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY:-}
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
- AIRFLOW__CORE__LOAD_EXAMPLES=false
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: scheduler
volumes:
postgres-db-volume:
Then run:
# Generate a Fernet key
pip install cryptography
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
# Set the key as an environment variable
export FERNET_KEY=<your_generated_key>
# Start the services
docker-compose up -d
Method 3: Using Kubernetes with Helm (Production)
For production deployments, Kubernetes with the official Helm chart is recommended. This is beyond the scope of this basic setup guide, but you can refer to the Airflow Helm chart documentation.
DAG Fundamentals
Let’s create a simple DAG to understand the basic structure:
"""
Example DAG demonstrating basic Airflow concepts.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Instantiate the DAG
dag = DAG(
'example_dag',
default_args=default_args,
description='A simple example DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['example'],
)
# Define tasks
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
def print_context(**context):
"""Print the Airflow context and ds variable from the context."""
print(f"Execution date: {context['ds']}")
print(f"Logical date: {context['logical_date']}")
return 'Task completed successfully!'
task2 = PythonOperator(
task_id='print_context',
python_callable=print_context,
dag=dag,
)
# Set task dependencies
task1 >> task2
Key Components Explained
-
Imports: We import necessary modules from Airflow and Python’s standard library.
-
Default Arguments: These apply to all tasks in the DAG unless overridden.
-
DAG Definition:
dag_id
: Unique identifier for the DAGschedule_interval
: How often the DAG runsstart_date
: When the DAG should start being scheduledcatchup
: Whether to run missed intervals if start_date is in the past
-
Tasks: We define two tasks, one using BashOperator and one using PythonOperator.
-
Dependencies: We establish that task2 should run after task1 using the
>>
operator.
Scheduling and Execution Model
Airflow follows a specific execution model:
- The scheduler evaluates your DAG at regular intervals.
- For each scheduled interval that hasn’t been run yet, it creates a DAG run.
- Each DAG run evaluates task dependencies to determine which tasks are ready to execute.
- Ready tasks are pushed to the executor.
- The executor runs the tasks according to its implementation (locally, on workers, etc.).
The
execution_date
(renamed tological_date
in newer Airflow versions) represents the start of the interval being processed, not when the DAG actually runs.
Task Context
Each task receives a context dictionary with useful metadata:
ds
: Execution date as YYYY-MM-DDds_nodash
: Execution date as YYYYMMDDprev_ds
: Previous execution datenext_ds
: Next execution datedag
: The DAG objecttask
: The task objectparams
: User-defined params passed to the DAG- And many others
You can access these in your Python callables or through Jinja templates.
Operators in Depth
Operators encapsulate the logic of what a task actually does. Let’s explore some of the most useful ones:
BashOperator
The BashOperator executes bash commands:
from airflow.operators.bash import BashOperator
# Simple command
task = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
# Using environment variables
task = BashOperator(
task_id='print_env',
bash_command='echo $MY_VAR',
env={'MY_VAR': 'value'},
dag=dag,
)
# Using Jinja templating
task = BashOperator(
task_id='print_execution_date',
bash_command='echo "Execution date: {{ ds }}"',
dag=dag,
)
PythonOperator
The PythonOperator calls Python functions:
from airflow.operators.python import PythonOperator
def my_function(x, **context):
"""Example function that will be called by the PythonOperator."""
print(f"Value passed: {x}")
print(f"Execution date: {context['ds']}")
return x * 10
task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_kwargs={'x': 5},
dag=dag,
)
# Using templates
def template_function(**context):
return context['templates_dict']['my_param']
task = PythonOperator(
task_id='template_task',
python_callable=template_function,
templates_dict={'my_param': '{{ ds }}'},
dag=dag,
)
EmailOperator
Sends emails:
from airflow.operators.email import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='recipient@example.com',
subject='Airflow Alert: {{ ds }}',
html_content="""
<h3>Dag Run for {{ ds }} completed</h3>
<p>Task ID: {{ task.task_id }}</p>
""",
dag=dag,
)
SQLOperator Variants
Execute SQL queries on various databases:
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
# PostgreSQL
postgres_task = PostgresOperator(
task_id='postgres_query',
postgres_conn_id='postgres_default',
sql="""
INSERT INTO my_table (date, value)
VALUES ('{{ ds }}', 42)
""",
dag=dag,
)
# MySQL
mysql_task = MySqlOperator(
task_id='mysql_query',
mysql_conn_id='mysql_default',
sql="SELECT * FROM my_table WHERE date = '{{ ds }}'",
dag=dag,
)
Sensor Operators
Sensors are a special type of operator that waits for a condition to be true:
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.sql import SqlSensor
# Wait for a file to appear
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file',
poke_interval=60, # check every minute
timeout=60*60*5, # timeout after 5 hours
mode='poke', # reschedule after each check
dag=dag,
)
# Wait for another DAG's task to complete
external_task_sensor = ExternalTaskSensor(
task_id='wait_for_other_dag_task',
external_dag_id='other_dag',
external_task_id='final_task',
execution_delta=timedelta(hours=1), # look for runs 1 hour earlier
dag=dag,
)
# Wait for a SQL query to return a non-empty result
sql_sensor = SqlSensor(
task_id='wait_for_data',
conn_id='postgres_default',
sql="SELECT 1 FROM my_table WHERE date = '{{ ds }}' LIMIT 1",
dag=dag,
)
TaskFlow API (Airflow 2.0+)
In newer versions of Airflow, you can use the TaskFlow API to create tasks with less boilerplate:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
)
def taskflow_example_dag():
"""Example DAG using the TaskFlow API."""
@task
def extract():
"""Extract data task."""
data = {'order_id': 42, 'customer_id': 100}
return data
@task
def transform(order_data):
"""Transform task that takes the output of extract as input."""
order_data['order_value'] = 100.0
return order_data
@task
def load(order_data):
"""Load task that takes the output of transform as input."""
print(f"Order data: {order_data}")
# Define the task dependencies
load(transform(extract()))
# Create the DAG
taskflow_dag = taskflow_example_dag()
This approach automatically handles passing data between tasks using XComs.
XComs: Passing Data Between Tasks
XComs (short for “cross-communications”) let tasks exchange small amounts of data:
Basic XCom Usage
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_function(**context):
"""Pushes a value to XCom."""
context['ti'].xcom_push(key='sample_value', value=42)
# Return values are automatically pushed to XCom with key 'return_value'
return "This also gets pushed to XCom"
def pull_function(**context):
"""Pulls values from XCom."""
# Pull value with specific key
value1 = context['ti'].xcom_pull(task_ids='push_task', key='sample_value')
# Pull return value
value2 = context['ti'].xcom_pull(task_ids='push_task')
print(f"Pulled values: {value1}, {value2}")
with DAG('xcom_example', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
)
push_task >> pull_task
XCom with TaskFlow API
The TaskFlow API makes working with XComs even easier:
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False)
def xcom_taskflow_example():
@task
def extract():
return {"value": 42}
@task
def process(data):
return {
"processed_value": data["value"] * 10
}
@task
def load(data):
print(f"Final value: {data['processed_value']}")
# Define the workflow
data = extract()
processed_data = process(data)
load(processed_data)
xcom_dag = xcom_taskflow_example()
XCom Limitations and Best Practices
XComs are stored in Airflow’s metadata database and are not designed for large data transfer:
- Size limitations: XComs are meant for small amounts of data (typically < 1MB).
- Performance impact: Large XComs can impact Airflow’s performance.
- Serialization: Data must be serializable (Airflow uses pickle by default).
For large data, consider these alternatives:
- Use intermediate storage like S3, HDFS, or a database
- Use Airflow connections to access shared resources
- Pass identifiers or file paths instead of the actual data
Avoid using XComs for large datasets, sensitive information, or as a primary data storage mechanism.
Sensors and Hooks
Sensors in Depth
Sensors are special operators that wait for conditions to be met:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.sensors.http import HttpSensor
from datetime import datetime, timedelta
with DAG('sensor_examples', start_date=datetime(2023, 1, 1)) as dag:
# Wait for a file to appear
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/tmp/data.csv',
poke_interval=60, # seconds between checks
timeout=3600, # timeout after 1 hour
)
# Wait for a specified amount of time
time_sensor = TimeDeltaSensor(
task_id='wait_1_hour',
delta=timedelta(hours=1),
)
# Wait for an HTTP endpoint to return a 200
http_sensor = HttpSensor(
task_id='check_api',
http_conn_id='http_default',
endpoint='api/v1/status',
response_check=lambda response: response.status_code == 200,
poke_interval=60,
timeout=3600,
)
Sensor Modes
Sensors have different modes that affect how they consume resources:
- poke (default): The sensor takes up a worker slot for its entire execution
- reschedule: The sensor releases the worker slot between checks
- smart: Switches between poke and reschedule based on resource availability
file_sensor = FileSensor(
task_id='efficient_file_sensor',
filepath='/tmp/data.csv',
mode='reschedule', # Release the worker between checks
poke_interval=300, # Check every 5 minutes
dag=dag,
)
Hooks: Connecting to External Systems
Hooks provide a standardized interface to external systems:
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
def hook_example(**context):
# Get a connection by ID
conn = BaseHook.get_connection('my_postgres_conn')
print(f"Host: {conn.host}, Login: {conn.login}")
# Use PostgreSQL hook
pg_hook = PostgresHook(postgres_conn_id='my_postgres_conn')
records = pg_hook.get_records("SELECT * FROM users LIMIT 5")
# Use S3 hook
s3_hook = S3Hook(aws_conn_id='my_aws_conn')
file_content = s3_hook.read_key('my-bucket/data.csv')
# Use HTTP hook
http_hook = HttpHook(http_conn_id='my_api_conn', method='GET')
response = http_hook.run('api/v1/data')
Creating Custom Hooks
You can create custom hooks to encapsulate complex connection logic:
from airflow.hooks.base import BaseHook
import requests
class MyAPIHook(BaseHook):
"""Hook to interact with a custom API."""
def __init__(self, api_conn_id='my_api_default'):
super().__init__()
self.api_conn_id = api_conn_id
self.connection = self.get_connection(api_conn_id)
self.base_url = self.connection.host
self.api_key = self.connection.password
def get_data(self, endpoint, params=None):
"""Get data from the API."""
url = f"{self.base_url}/{endpoint}"
headers = {"Authorization": f"Bearer {self.api_key}"}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def post_data(self, endpoint, data):
"""Post data to the API."""
url = f"{self.base_url}/{endpoint}"
headers = {"Authorization": f"Bearer {self.api_key}"}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
Using your custom hook:
def use_my_api(**context):
hook = MyAPIHook(api_conn_id='my_api_conn')
data = hook.get_data('users', params={'limit': 10})
print(f"Retrieved {len(data)} users")
Dynamic DAG Generation
One of Airflow’s powerful features is the ability to generate DAGs dynamically:
Generating Tasks in a Loop
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('dynamic_tasks', start_date=datetime(2023, 1, 1)) as dag:
def process_data(data_id, **context):
print(f"Processing data for ID: {data_id}")
# Generate tasks dynamically
tasks = []
for i in range(1, 6):
task = PythonOperator(
task_id=f'process_data_{i}',
python_callable=process_data,
op_kwargs={'data_id': i},
)
tasks.append(task)
# Set dependencies
for i in range(len(tasks) - 1):
tasks[i] >> tasks[i + 1]
Generating DAGs from Configuration
You can generate multiple DAGs from external configuration:
import yaml
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# Load configuration
with open('/path/to/dags_config.yaml') as f:
config = yaml.safe_load(f)
# Create a DAG for each configuration
for dag_config in config['dags']:
dag_id = dag_config['id']
dag = DAG(
dag_id=dag_id,
schedule_interval=dag_config.get('schedule', '@daily'),
start_date=datetime.fromisoformat(dag_config.get('start_date', '2023-01-01')),
default_args=dag_config.get('default_args', {}),
)
# Add this DAG to the global namespace
globals()[dag_id] = dag
# Create tasks for this DAG
for task_config in dag_config['tasks']:
task = PythonOperator(
task_id=task_config['id'],
python_callable=globals()[task_config['function']],
op_kwargs=task_config.get('kwargs', {}),
dag=dag,
)
Example configuration file:
dags:
- id: data_processing_dag
schedule: '0 0 * * *'
start_date: '2023-01-01'
tasks:
- id: extract_data
function: extract_data_function
kwargs:
source: 'db1'
- id: transform_data
function: transform_data_function
- id: load_data
function: load_data_function
kwargs:
destination: 'warehouse'
- id: reporting_dag
schedule: '0 6 * * *'
tasks:
- id: generate_report
function: generate_report_function
Factory Functions for DAG Creation
A cleaner pattern is to use factory functions:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def create_data_processing_dag(dag_id, schedule, source_system, target_system):
"""Factory function to create a DAG."""
default_args = {
'owner': 'data_engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=schedule,
start_date=datetime(2023, 1, 1),
catchup=False,
)
def extract(**context):
print(f"Extracting data from {source_system}")
return {"extracted_rows": 100}
def transform(**context):
ti = context['ti']
extract_result = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extract_result['extracted_rows']} rows")
return {"transformed_rows": extract_result['extracted_rows']}
def load(**context):
ti = context['ti']
transform_result = ti.xcom_pull(task_ids='transform')
print(f"Loading {transform_result['transformed_rows']} rows to {target_system}")
with dag:
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
)
extract_task >> transform_task >> load_task
return dag
# Create multiple DAGs
for source, target, schedule in [
('mysql', 'warehouse', '0 0 * * *'),
('postgres', 'warehouse', '0 6 * * *'),
('mongodb', 'warehouse', '0 12 * * *')
]:
dag_id = f"{source}_to_{target}_dag"
globals()[dag_id] = create_data_processing_dag(
dag_id=dag_id,
schedule=schedule,
source_system=source,
target_system=target
)
Testing and Debugging
Testing DAGs is essential for reliable workflows. Here are several approaches:
Unit Testing Operator Logic
Test the functions that operators will call:
import unittest
from unittest.mock import patch
def process_data(value):
"""Function to be called by a PythonOperator."""
if value < 0:
raise ValueError("Value cannot be negative")
return value * 2
class TestProcessData(unittest.TestCase):
def test_process_data_positive(self):
"""Test process_data with positive input."""
result = process_data(5)
self.assertEqual(result, 10)
def test_process_data_negative(self):
"""Test process_data with negative input."""
with self.assertRaises(ValueError):
process_data(-1)
Testing DAG Structure
Test that your DAG structure is correct:
import unittest
from airflow.models import DagBag
class TestDagValidation(unittest.TestCase):
def setUp(self):
self.dagbag = DagBag()
def test_dags_load_without_errors(self):
"""Verify all DAGs can be imported without errors."""
self.assertEqual(
len(self.dagbag.import_errors),
0,
f"DAG import errors: {self.dagbag.import_errors}"
)
def test_dag_structure(self):
"""Test the structure of a specific DAG."""
dag_id = 'example_dag'
dag = self.dagbag.get_dag(dag_id)
# Make sure the DAG exists
self.assertIsNotNone(dag)
# Test task count
tasks = dag.tasks
self.assertEqual(len(tasks), 3, f"Expected 3 tasks, found {len(tasks)}")
# Test specific task existence
task_ids = [task.task_id for task in tasks]
self.assertIn('extract', task_ids)
self.assertIn('transform', task_ids)
self.assertIn('load', task_ids)
# Test task dependencies
upstream_task_ids = {
task.task_id: [t.task_id for t in task.upstream_list]
for task in tasks
}
self.assertEqual(upstream_task_ids['extract'], [])
self.assertEqual(upstream_task_ids['transform'], ['extract'])
self.assertEqual(upstream_task_ids['load'], ['transform'])
Mocking External Dependencies
When testing tasks that interact with external systems, use mocks:
import unittest
from unittest.mock import patch, MagicMock
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
def fetch_data_from_api(api_endpoint, **context):
"""Function that calls an external API."""
import requests
response = requests.get(api_endpoint)
response.raise_for_status()
return response.json()
class TestApiTask(unittest.TestCase):
@patch('requests.get')
def test_fetch_data_from_api(self, mock_get):
"""Test the function that fetches API data."""
# Configure the mock
mock_response = MagicMock()
mock_response.json.return_value = {'data': [1, 2, 3]}
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
# Test the function
result = fetch_data_from_api('https://example.com/api')
# Assertions
mock_get.assert_called_once_with('https://example.com/api')
self.assertEqual(result, {'data': [1, 2, 3]})
Using the Airflow CLI for Testing
The Airflow CLI provides tools for testing:
# Test a specific task
airflow tasks test example_dag extract 2023-01-01
# Test DAG parsing
airflow dags list
# Backfill a date range to test full DAG execution
airflow dags backfill --start-date 2023-01-01 --end-date 2023-01-02 example_dag
Debugging with Airflow
Strategies for debugging Airflow DAGs:
- Local execution: Run tasks with the CLI to debug issues.
- Incremental development: Add and test tasks one by one.
- Use print statements: Include print statements in Python callables for visibility.
- Leverage Airflow logs: Check logs in the UI or on disk.
- XCom inspection: Use XComs to inspect intermediate values.
Advanced debugging example:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
def debug_task(**context):
"""Task with extensive debugging."""
logger.info("Task started")
# Log the task context
logger.info(f"Execution date: {context['ds']}")
logger.info(f"Task instance: {context['ti']}")
# Log available connections
from airflow.hooks.base import BaseHook
connections = BaseHook.get_connections()
logger.info(f"Available connections: {[c.conn_id for c in connections]}")
# Log environment variables
import os
logger.info(f"Environment: {dict(os.environ)}")
# Log intermediate results
result = some_calculation()
logger.info(f"Intermediate result: {result}")
# Push debugging info to XCom
context['ti'].xcom_push(key='debug_info', value={'env': dict(os.environ), 'result': result})
return result
with DAG('debug_dag', start_date=datetime(2023, 1, 1)) as dag:
debug = PythonOperator(
task_id='debug_task',
python_callable=debug_task,
)
Performance Optimization
As your Airflow deployment grows, performance optimization becomes important:
DAG Performance Best Practices
- Minimize DAG file parse time:
- Avoid expensive imports at module level
- Keep top-level code lightweight
- Cache expensive computations
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# BAD: Expensive operation at module level
# import pandas as pd
# df = pd.read_csv('large_file.csv') # This runs every time Airflow parses the DAG!
# GOOD: Move expensive operations inside task functions
def process_data(**context):
import pandas as pd # Import only when task executes
df = pd.read_csv('large_file.csv')
# Process data...
return df.shape[0]
with DAG('optimized_dag', start_date=datetime(2023, 1, 1)) as dag:
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
- Optimize task scheduling:
- Use appropriate
depends_on_past
settings - Set concurrency limits for DAGs and tasks
- Use pools to manage resource contention
- Use appropriate
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
'resource_optimized_dag',
start_date=datetime(2023, 1, 1),
# Prevent multiple concurrent runs of this DAG
max_active_runs=1,
# Configure default task concurrency within the DAG
concurrency=5,
default_args={
# Only run if previous runs succeeded
'depends_on_past': True,
# Back off exponentially on retries
'retry_exponential_backoff': True,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
) as dag:
# Resource-intensive task
heavy_task = PythonOperator(
task_id='heavy_processing',
python_callable=lambda: None,
# Assign to a resource pool
pool='cpu_intensive_pool',
# Set a higher priority (lower number = higher priority)
priority_weight=10,
# Set execution timeout
execution_timeout=timedelta(hours=1),
)
- Data transfer optimization:
- Avoid large XComs
- Use external storage for data transfer
- Pass references (paths, IDs) instead of data
def extract(**context):
"""Extract data and save to a file."""
data = fetch_large_dataset()
file_path = f"/tmp/data_{context['ds_nodash']}.parquet"
# Save to disk instead of using XCom
data.to_parquet(file_path)
# Push only the file path to XCom
return {'file_path': file_path}
def transform(**context):
"""Transform using file path from previous task."""
ti = context['ti']
file_info = ti.xcom_pull(task_ids='extract')
import pandas as pd
data = pd.read_parquet(file_info['file_path'])
# Transform data...
Executor Configuration
Choose the right executor for your workload:
- LocalExecutor: Good for small deployments
- CeleryExecutor: Distributed execution for scaling
- KubernetesExecutor: Dynamic scaling with Kubernetes
Example CeleryExecutor configuration:
# airflow.cfg
executor = CeleryExecutor
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
Database Optimization
The metadata database can become a bottleneck:
-
Regular maintenance:
- Configure log cleanups
- Set appropriate history retention
- Run database vacuum regularly
-
Indexing:
- Add custom indexes for frequent queries
- Monitor slow queries
-- Add index for task instance lookups
CREATE INDEX IF NOT EXISTS ti_dag_date ON task_instance (dag_id, execution_date);
-- Add index for task failure lookups
CREATE INDEX IF NOT EXISTS ti_state ON task_instance (state);
- Connection pooling:
- Use PgBouncer for PostgreSQL
- Tune connection pool settings
Security Best Practices
Securing your Airflow deployment is critical:
Secure Connections and Variables
Sensitive credentials should be securely stored:
from airflow.models import Connection, Variable
from airflow.hooks.base import BaseHook
# Create encrypted connections via UI or CLI
# airflow connections add 'my_db' --conn-uri 'postgresql://user:pass@host:port/db'
def secure_task(**context):
# Retrieve securely stored connection
conn = BaseHook.get_connection('my_db')
print(f"Connected to {conn.host} as {conn.login}")
# Retrieve securely stored variable
api_key = Variable.get("api_key")
# Use but don't print the key
Never hardcode credentials in DAG files!
Fernet Key for Encryption
Airflow uses Fernet for encrypting sensitive information:
# Generate a Fernet key
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
# Set in airflow.cfg
[core]
fernet_key = your_generated_key_here
# Or as an environment variable
export AIRFLOW__CORE__FERNET_KEY=your_generated_key_here
Role-Based Access Control (RBAC)
Configure user roles and permissions:
# Example roles in an RBAC setup
ROLES = {
'Admin': {'permissions': ['can_create', 'can_read', 'can_edit', 'can_delete', 'can_pause']},
'Editor': {'permissions': ['can_read', 'can_edit']},
'Viewer': {'permissions': ['can_read']},
}
# Configure in airflow.cfg
[webserver]
rbac = True
Limiting Task Access
Restrict what tasks can do:
-
Run as a different user:
task = BashOperator( task_id='restricted_task', bash_command='whoami', run_as_user='airflow', )
-
Use a Docker operator for isolation:
from airflow.providers.docker.operators.docker import DockerOperator task = DockerOperator( task_id='dockerized_task', image='python:3.9-slim', command='python -c "print(\'Hello\')"', auto_remove=True, docker_url='unix://var/run/docker.sock', )
-
Kubernetes Pod Operator for stronger isolation:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator task = KubernetesPodOperator( task_id='k8s_task', name='pod-task', namespace='default', image='python:3.9-slim', cmds=["python", "-c"], arguments=["print('Hello')"], service_account_name='airflow', security_context={ 'runAsUser': 1000, 'allowPrivilegeEscalation': False, }, )
Network Security
Secure your Airflow components:
# In airflow.cfg
[webserver]
web_server_ssl_cert = /path/to/cert.pem
web_server_ssl_key = /path/to/key.pem
# Enable authentication
authenticate = True
auth_backend = airflow.api.auth.backend.basic_auth
Production Deployment
When deploying Airflow to production, consider these best practices:
Containerization with Docker
Using Docker provides consistency and isolation:
# Example Dockerfile for a custom Airflow image
FROM apache/airflow:2.7.1
USER root
# Install additional system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
USER airflow
# Install Python packages
COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt
# Copy custom DAGs, plugins, etc.
COPY --chown=airflow:root dags /opt/airflow/dags
COPY --chown=airflow:root plugins /opt/airflow/plugins
Kubernetes Deployment
For production-grade deployments, Kubernetes with the official Helm chart is recommended:
# Add the Airflow Helm repository
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Create a values file for customization
cat > custom-values.yaml << EOL
executor: CeleryExecutor
postgresql:
enabled: true
redis:
enabled: true
webserver:
replicas: 2
scheduler:
replicas: 2
workers:
replicas: 3
dags:
persistence:
enabled: true
size: 1Gi
logs:
persistence:
enabled: true
size: 5Gi
EOL
# Install Airflow
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
-f custom-values.yaml
Continuous Deployment for DAGs
Implement CI/CD for your DAGs:
- Version control: Store DAGs in a Git repository
- CI checks: Run linting and tests on pull requests
- CD pipeline: Automatically deploy to production
Example GitHub Actions workflow:
name: Airflow DAG CI/CD
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install apache-airflow pytest pylint
pip install -r requirements.txt
- name: Lint with pylint
run: |
pylint --disable=C0111,C0103 dags/
- name: Test with pytest
run: |
pytest tests/
deploy:
needs: test
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Deploy DAGs to S3
run: |
aws s3 sync dags/ s3://my-airflow-dags-bucket/dags/
Monitoring and Alerting
Set up comprehensive monitoring:
-
Metrics with Prometheus:
# airflow.cfg [metrics] statsd_on = True statsd_host = localhost statsd_port = 8125 statsd_prefix = airflow
-
Visualization with Grafana:
- Create dashboards for DAG performance
- Track task successes/failures
- Monitor system resources
-
Alerting:
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.email import EmailOperator from datetime import datetime with DAG('monitored_dag', start_date=datetime(2023, 1, 1)) as dag: # Critical task task = PythonOperator( task_id='critical_task', python_callable=lambda: None, email_on_failure=True, email='alerts@example.com', ) # Custom alerting logic alert = EmailOperator( task_id='alert_ops', to='ops@example.com', subject='DAG {{ dag.dag_id }} {{ dag_run.run_id }} - Issue detected', html_content='Task {{ ti.task_id }} has encountered a problem.', trigger_rule='one_failed', ) task >> alert
Advanced Patterns
Take your Airflow skills to the next level with these advanced patterns:
Branching and Conditional Execution
Control task execution flow with branching:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
def _choose_path(**context):
"""Decide which path to take based on execution date."""
if context['execution_date'].day % 2 == 0:
return 'even_day_task'
else:
return 'odd_day_task'
with DAG('branching_dag', start_date=datetime(2023, 1, 1)) as dag:
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=_choose_path,
)
even_day = PythonOperator(
task_id='even_day_task',
python_callable=lambda: print("Even day processing"),
)
odd_day = PythonOperator(
task_id='odd_day_task',
python_callable=lambda: print("Odd day processing"),
)
join = PythonOperator(
task_id='join_task',
python_callable=lambda: print("Processing complete"),
trigger_rule='one_success', # Run after any upstream task succeeds
)
branch >> [even_day, odd_day] >> join
Task Groups for Organization
Organize related tasks together:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
with DAG('task_group_dag', start_date=datetime(2023, 1, 1)) as dag:
start = PythonOperator(
task_id='start',
python_callable=lambda: print("Starting pipeline"),
)
# Create a task group for extract tasks
with TaskGroup(group_id='extract_tasks') as extract_group:
extract_a = PythonOperator(
task_id='extract_a',
python_callable=lambda: print("Extracting A"),
)
extract_b = PythonOperator(
task_id='extract_b',
python_callable=lambda: print("Extracting B"),
)
# Create a task group for transform tasks
with TaskGroup(group_id='transform_tasks') as transform_group:
transform_a = PythonOperator(
task_id='transform_a',
python_callable=lambda: print("Transforming A"),
)
transform_b = PythonOperator(
task_id='transform_b',
python_callable=lambda: print("Transforming B"),
)
load = PythonOperator(
task_id='load',
python_callable=lambda: print("Loading data"),
)
# Set dependencies
start >> extract_group >> transform_group >> load
# You can also set dependencies within groups
# extract_a >> transform_a
# extract_b >> transform_b
Custom Operators
Create reusable task components:
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class DataQualityOperator(BaseOperator):
"""
Custom operator for data quality checks.
"""
@apply_defaults
def __init__(
self,
conn_id,
table,
checks,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.table = table
self.checks = checks
def execute(self, context):
"""Execute data quality checks."""
self.log.info(f"Running data quality checks on {self.table}")
# Use a hook to connect to the database
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=self.conn_id)
failed_tests = []
for check in self.checks:
# Each check is a dictionary with query and expected result
query = check['query']
expected = check['expected']
self.log.info(f"Running check: {query}")
records = hook.get_records(query)
if not records or not records[0]:
failed_tests.append(f"Check returned no results: {query}")
continue
actual = records[0][0]
self.log.info(f"Check result: {actual}")
if actual != expected:
failed_tests.append(f"Check failed: {query}, expected {expected}, got {actual}")
if failed_tests:
self.log.error("Data quality checks failed!")
self.log.error("\n".join(failed_tests))
raise ValueError("Data quality checks failed")
self.log.info("All data quality checks passed!")
Using the custom operator:
from airflow import DAG
from datetime import datetime
with DAG('data_quality_dag', start_date=datetime(2023, 1, 1)) as dag:
dq_check = DataQualityOperator(
task_id='check_users_table',
conn_id='my_postgres',
table='users',
checks=[
{
'query': "SELECT COUNT(*) FROM users WHERE email IS NULL",
'expected': 0
},
{
'query': "SELECT COUNT(*) FROM users",
'expected': lambda x: x > 0 # Can also use a function
}
],
)
Subdag Pattern (Legacy)
The SubDAG pattern is now considered legacy and TaskGroups are preferred.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
def load_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):
"""Creates a subdag for loading data."""
dag = DAG(
dag_id=f'{parent_dag_id}.{child_dag_id}',
start_date=start_date,
schedule_interval=schedule_interval,
)
with dag:
load_a = PythonOperator(
task_id='load_a',
python_callable=lambda: print("Loading A"),
)
load_b = PythonOperator(
task_id='load_b',
python_callable=lambda: print("Loading B"),
)
return dag
with DAG('main_dag', start_date=datetime(2023, 1, 1)) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=lambda: print("Extracting data"),
)
transform = PythonOperator(
task_id='transform',
python_callable=lambda: print("Transforming data"),
)
load = SubDagOperator(
task_id='load',
subdag=load_subdag(
parent_dag_id='main_dag',
child_dag_id='load',
start_date=dag.start_date,
schedule_interval=dag.schedule_interval,
),
)
extract >> transform >> load
Triggers
Create event-based workflows:
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
# DAG 1: Triggers DAG 2 on completion
with DAG('trigger_dag', start_date=datetime(2023, 1, 1)) as dag1:
task = PythonOperator(
task_id='process_data',
python_callable=lambda: print("Processing data"),
)
trigger = TriggerDagRunOperator(
task_id='trigger_reporting',
trigger_dag_id='reporting_dag',
execution_date='{{ ds }}', # Pass the same execution date
reset_dag_run=True, # If the target DAG run exists, reset it
wait_for_completion=True, # Wait for the triggered DAG to complete
)
task >> trigger
# DAG 2: The reporting DAG
with DAG('reporting_dag', start_date=datetime(2023, 1, 1)) as dag2:
# Wait for the first DAG to complete
wait = ExternalTaskSensor(
task_id='wait_for_processing',
external_dag_id='trigger_dag',
external_task_id='process_data',
execution_date_fn=lambda dt: dt, # Use the same execution date
mode='reschedule',
timeout=3600,
)
report = PythonOperator(
task_id='generate_report',
python_callable=lambda: print("Generating report"),
)
wait >> report
Troubleshooting Common Issues
When things go wrong, use these troubleshooting steps:
DAG Not Appearing in UI
-
Check file permissions:
ls -la $AIRFLOW_HOME/dags/
-
Verify DAG file syntax:
python -c "import your_dag_file"
-
Check the scheduler logs:
cat $AIRFLOW_HOME/logs/scheduler/*.log | grep "your_dag_id"
-
Verify DAG directory configuration:
grep "dags_folder" $AIRFLOW_HOME/airflow.cfg
Tasks Not Running
-
Check task dependencies:
- Verify that upstream tasks are completing
- Look for
depends_on_past=True
settings - Check for missing external dependencies
-
Verify pool availability:
- Check if the task uses a pool and if slots are available
- Adjust pool sizes if needed
-
Check concurrency settings:
# airflow.cfg [core] parallelism = 32 # Max total tasks across all DAGs max_active_tasks_per_dag = 16 # Max concurrent tasks per DAG
Failed Tasks
-
Check task logs in the UI or on disk:
cat $AIRFLOW_HOME/logs/dag_id/task_id/execution_date/*.log
-
Re-run with debug info:
# Increase log level export AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG # Re-run the task airflow tasks test dag_id task_id execution_date
-
Common failure causes:
- Connectivity issues to external systems
- Missing dependencies
- Resource constraints
- Timeouts
- Permissions problems
Performance Issues
-
Check database health:
-- For PostgreSQL SELECT state, COUNT(*) FROM task_instance GROUP BY state; -- Check for long-running queries SELECT pid, now() - pg_stat_activity.query_start AS duration, query FROM pg_stat_activity WHERE pg_stat_activity.query != '<IDLE>' ORDER BY duration DESC;
-
Monitor resource usage:
# Check worker resources htop # Check disk usage df -h # Check log size du -sh $AIRFLOW_HOME/logs/
-
Optimize DAG parsing:
- Use the new DAG serialization feature
- Implement minimum file processing interval
- Create smaller, focused DAG files
Resources and Further Learning
Official Documentation
Community Resources
- Airflow Slack Channel
- Awesome Apache Airflow - Curated list of resources
Books
- “Data Pipelines with Apache Airflow” by Bas Harenslak and Julian de Ruiter
- “Fundamentals of Data Engineering” by Joe Reis and Matt Housley
Courses and Tutorials
Blogs and Articles
Join the Airflow community and contribute! Open source projects thrive on community involvement, whether through code contributions, documentation, or simply helping others in forums.
Conclusion
Apache Airflow provides a powerful platform for building and managing data pipelines. By understanding its architecture, concepts, and best practices, you can create robust, maintainable, and scalable workflows.
Key takeaways from this guide:
- DAGs are central to Airflow, providing a clear representation of workflow dependencies
- Python-based definition gives flexibility and power in workflow creation
- Operators encapsulate task logic, making workflows modular and reusable
- Testing is essential for reliable pipeline operation
- Production deployment requires careful consideration of security, scaling, and monitoring
As data pipelines grow in complexity, Airflow’s programmatic approach and rich ecosystem of integrations provide the tools needed to maintain control and visibility over your workflows.
We hope this guide helps you on your journey with Apache Airflow. Happy orchestrating!