This guide assumes familiarity with Python programming and data pipeline concepts. We'll progressively build from core concepts to advanced implementation patterns.


Table of Contents


Introduction to Apache Airflow

Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows. Developed at Airbnb in 2014 and later contributed to the Apache Software Foundation, Airflow has become the de facto standard for orchestrating complex computational workflows and data processing pipelines.

Why Airflow?

Traditional scheduling tools like cron lack visibility, dependency management, and error handling capabilities required for complex data pipelines. Airflow addresses these limitations by providing:

  1. Programmatic workflow creation - Define workflows as Python code
  2. Visualization of pipeline dependencies - DAG view shows relationships between tasks
  3. Extensible architecture - Easy integration with various systems via operators and hooks
  4. Scalable execution model - Distribute task execution across workers
  5. Rich UI - Monitor, troubleshoot, and manage workflows

Airflow excels in scenarios requiring complex dependencies between tasks and integration with multiple systems. For simpler workflows, lighter solutions might be more appropriate.

Airflow’s Architecture

Airflow operates with several key components:

  • Webserver: Provides the UI for monitoring and managing DAGs and tasks
  • Scheduler: Orchestrates task execution based on dependencies and schedules
  • Executor: Determines how tasks get run (locally, on remote workers, etc.)
  • Metadata Database: Stores DAG and task execution metadata
  • DAG Directory: Contains Python files defining workflow DAGs

The architecture follows a message-passing paradigm where the scheduler identifies tasks ready for execution and passes them to the executor, which distributes them to workers.


Core Concepts

To effectively work with Airflow, you need to understand several fundamental concepts:

DAGs (Directed Acyclic Graphs)

A DAG represents a workflow as a collection of tasks with dependencies between them. The “directed” aspect means that data flows in one direction, while “acyclic” means there are no cycles (a task cannot depend on itself directly or indirectly).

DAGs in Airflow are defined in Python, allowing for dynamic generation and complex logic in workflow construction.

Tasks

Tasks are the units of execution in Airflow. Each task represents a piece of work to be done and is implemented using an Operator. Tasks are arranged within a DAG to form the workflow.

Operators

Operators determine what gets done by a task. Airflow provides many built-in operators:

  • BashOperator: Executes bash commands
  • PythonOperator: Calls Python functions
  • SQLOperator variants: Execute SQL queries
  • Sensor Operators: Wait for conditions to be met
  • Transfer Operators: Move data between systems

Task Instances

A task instance represents a specific run of a task at a point in time. It has its own state (running, success, failed, etc.) and execution context.

Task Dependencies

Dependencies define the order in which tasks should run. In Airflow, you can use multiple methods to establish dependencies:

# Method 1: Using the >> operator
task1 >> task2 >> task3
 
# Method 2: Using the << operator
task3 << task2 << task1
 
# Method 3: Using the set_upstream/set_downstream methods
task1.set_downstream(task2)
task2.set_downstream(task3)

Execution Date and Data Intervals

Each DAG run is associated with a logical execution date (in Airflow 2.2+, this concept was expanded to data intervals). This doesn’t necessarily represent when the DAG runs but rather what data period the run is processing.


Setting Up Your Airflow Environment

There are several ways to set up Airflow, depending on your needs:

Method 1: Using pip (Development Environment)

# Create a virtual environment
python -m venv airflow-env
source airflow-env/bin/activate  # On Windows: airflow-env\Scripts\activate
 
# Install Airflow with extras
pip install "apache-airflow[postgres,s3,http]==2.7.1"
 
# Set the Airflow home directory
export AIRFLOW_HOME=~/airflow
 
# Initialize the database
airflow db init
 
# Create a user
airflow users create \
    --username admin \
    --password admin \
    --firstname First \
    --lastname Last \
    --role Admin \
    --email admin@example.com
 
# Start the webserver and scheduler
airflow webserver --port 8080
airflow scheduler

Create a docker-compose.yml file:

version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
 
  webserver:
    image: apache/airflow:2.7.1
    depends_on:
      - postgres
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
      - AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY:-}
      - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
      - AIRFLOW__CORE__LOAD_EXAMPLES=false
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
 
  scheduler:
    image: apache/airflow:2.7.1
    depends_on:
      - postgres
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
      - AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY:-}
      - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
      - AIRFLOW__CORE__LOAD_EXAMPLES=false
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
    command: scheduler
 
volumes:
  postgres-db-volume:

Then run:

# Generate a Fernet key
pip install cryptography
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
 
# Set the key as an environment variable
export FERNET_KEY=<your_generated_key>
 
# Start the services
docker-compose up -d

Method 3: Using Kubernetes with Helm (Production)

For production deployments, Kubernetes with the official Helm chart is recommended. This is beyond the scope of this basic setup guide, but you can refer to the Airflow Helm chart documentation.


DAG Fundamentals

Let’s create a simple DAG to understand the basic structure:

"""
Example DAG demonstrating basic Airflow concepts.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
 
# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
 
# Instantiate the DAG
dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple example DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example'],
)
 
# Define tasks
task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)
 
def print_context(**context):
    """Print the Airflow context and ds variable from the context."""
    print(f"Execution date: {context['ds']}")
    print(f"Logical date: {context['logical_date']}")
    return 'Task completed successfully!'
 
task2 = PythonOperator(
    task_id='print_context',
    python_callable=print_context,
    dag=dag,
)
 
# Set task dependencies
task1 >> task2

Key Components Explained

  1. Imports: We import necessary modules from Airflow and Python’s standard library.

  2. Default Arguments: These apply to all tasks in the DAG unless overridden.

  3. DAG Definition:

    • dag_id: Unique identifier for the DAG
    • schedule_interval: How often the DAG runs
    • start_date: When the DAG should start being scheduled
    • catchup: Whether to run missed intervals if start_date is in the past
  4. Tasks: We define two tasks, one using BashOperator and one using PythonOperator.

  5. Dependencies: We establish that task2 should run after task1 using the >> operator.

Scheduling and Execution Model

Airflow follows a specific execution model:

  1. The scheduler evaluates your DAG at regular intervals.
  2. For each scheduled interval that hasn’t been run yet, it creates a DAG run.
  3. Each DAG run evaluates task dependencies to determine which tasks are ready to execute.
  4. Ready tasks are pushed to the executor.
  5. The executor runs the tasks according to its implementation (locally, on workers, etc.).

The execution_date (renamed to logical_date in newer Airflow versions) represents the start of the interval being processed, not when the DAG actually runs.

Task Context

Each task receives a context dictionary with useful metadata:

  • ds: Execution date as YYYY-MM-DD
  • ds_nodash: Execution date as YYYYMMDD
  • prev_ds: Previous execution date
  • next_ds: Next execution date
  • dag: The DAG object
  • task: The task object
  • params: User-defined params passed to the DAG
  • And many others

You can access these in your Python callables or through Jinja templates.


Operators in Depth

Operators encapsulate the logic of what a task actually does. Let’s explore some of the most useful ones:

BashOperator

The BashOperator executes bash commands:

from airflow.operators.bash import BashOperator
 
# Simple command
task = BashOperator(
    task_id='print_hello',
    bash_command='echo "Hello World"',
    dag=dag,
)
 
# Using environment variables
task = BashOperator(
    task_id='print_env',
    bash_command='echo $MY_VAR',
    env={'MY_VAR': 'value'},
    dag=dag,
)
 
# Using Jinja templating
task = BashOperator(
    task_id='print_execution_date',
    bash_command='echo "Execution date: {{ ds }}"',
    dag=dag,
)

PythonOperator

The PythonOperator calls Python functions:

from airflow.operators.python import PythonOperator
 
def my_function(x, **context):
    """Example function that will be called by the PythonOperator."""
    print(f"Value passed: {x}")
    print(f"Execution date: {context['ds']}")
    return x * 10
 
task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'x': 5},
    dag=dag,
)
 
# Using templates
def template_function(**context):
    return context['templates_dict']['my_param']
 
task = PythonOperator(
    task_id='template_task',
    python_callable=template_function,
    templates_dict={'my_param': '{{ ds }}'},
    dag=dag,
)

EmailOperator

Sends emails:

from airflow.operators.email import EmailOperator
 
email_task = EmailOperator(
    task_id='send_email',
    to='recipient@example.com',
    subject='Airflow Alert: {{ ds }}',
    html_content="""
        <h3>Dag Run for {{ ds }} completed</h3>
        <p>Task ID: {{ task.task_id }}</p>
    """,
    dag=dag,
)

SQLOperator Variants

Execute SQL queries on various databases:

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
 
# PostgreSQL
postgres_task = PostgresOperator(
    task_id='postgres_query',
    postgres_conn_id='postgres_default',
    sql="""
        INSERT INTO my_table (date, value)
        VALUES ('{{ ds }}', 42)
    """,
    dag=dag,
)
 
# MySQL
mysql_task = MySqlOperator(
    task_id='mysql_query',
    mysql_conn_id='mysql_default',
    sql="SELECT * FROM my_table WHERE date = '{{ ds }}'",
    dag=dag,
)

Sensor Operators

Sensors are a special type of operator that waits for a condition to be true:

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.sql import SqlSensor
 
# Wait for a file to appear
file_sensor = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file',
    poke_interval=60,  # check every minute
    timeout=60*60*5,   # timeout after 5 hours
    mode='poke',       # reschedule after each check
    dag=dag,
)
 
# Wait for another DAG's task to complete
external_task_sensor = ExternalTaskSensor(
    task_id='wait_for_other_dag_task',
    external_dag_id='other_dag',
    external_task_id='final_task',
    execution_delta=timedelta(hours=1),  # look for runs 1 hour earlier
    dag=dag,
)
 
# Wait for a SQL query to return a non-empty result
sql_sensor = SqlSensor(
    task_id='wait_for_data',
    conn_id='postgres_default',
    sql="SELECT 1 FROM my_table WHERE date = '{{ ds }}' LIMIT 1",
    dag=dag,
)

TaskFlow API (Airflow 2.0+)

In newer versions of Airflow, you can use the TaskFlow API to create tasks with less boilerplate:

from airflow.decorators import dag, task
from datetime import datetime
 
@dag(
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False
)
def taskflow_example_dag():
    """Example DAG using the TaskFlow API."""
    
    @task
    def extract():
        """Extract data task."""
        data = {'order_id': 42, 'customer_id': 100}
        return data
    
    @task
    def transform(order_data):
        """Transform task that takes the output of extract as input."""
        order_data['order_value'] = 100.0
        return order_data
    
    @task
    def load(order_data):
        """Load task that takes the output of transform as input."""
        print(f"Order data: {order_data}")
    
    # Define the task dependencies
    load(transform(extract()))
 
# Create the DAG
taskflow_dag = taskflow_example_dag()

This approach automatically handles passing data between tasks using XComs.


XComs: Passing Data Between Tasks

XComs (short for “cross-communications”) let tasks exchange small amounts of data:

Basic XCom Usage

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
 
def push_function(**context):
    """Pushes a value to XCom."""
    context['ti'].xcom_push(key='sample_value', value=42)
    # Return values are automatically pushed to XCom with key 'return_value'
    return "This also gets pushed to XCom"
 
def pull_function(**context):
    """Pulls values from XCom."""
    # Pull value with specific key
    value1 = context['ti'].xcom_pull(task_ids='push_task', key='sample_value')
    # Pull return value
    value2 = context['ti'].xcom_pull(task_ids='push_task')
    print(f"Pulled values: {value1}, {value2}")
 
with DAG('xcom_example', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
    push_task = PythonOperator(
        task_id='push_task',
        python_callable=push_function,
    )
    
    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=pull_function,
    )
    
    push_task >> pull_task

XCom with TaskFlow API

The TaskFlow API makes working with XComs even easier:

from airflow.decorators import dag, task
from datetime import datetime
 
@dag(start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False)
def xcom_taskflow_example():
    
    @task
    def extract():
        return {"value": 42}
    
    @task
    def process(data):
        return {
            "processed_value": data["value"] * 10
        }
    
    @task
    def load(data):
        print(f"Final value: {data['processed_value']}")
    
    # Define the workflow
    data = extract()
    processed_data = process(data)
    load(processed_data)
 
xcom_dag = xcom_taskflow_example()

XCom Limitations and Best Practices

XComs are stored in Airflow’s metadata database and are not designed for large data transfer:

  1. Size limitations: XComs are meant for small amounts of data (typically < 1MB).
  2. Performance impact: Large XComs can impact Airflow’s performance.
  3. Serialization: Data must be serializable (Airflow uses pickle by default).

For large data, consider these alternatives:

  • Use intermediate storage like S3, HDFS, or a database
  • Use Airflow connections to access shared resources
  • Pass identifiers or file paths instead of the actual data

Avoid using XComs for large datasets, sensitive information, or as a primary data storage mechanism.


Sensors and Hooks

Sensors in Depth

Sensors are special operators that wait for conditions to be met:

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.sensors.http import HttpSensor
from datetime import datetime, timedelta
 
with DAG('sensor_examples', start_date=datetime(2023, 1, 1)) as dag:
    
    # Wait for a file to appear
    file_sensor = FileSensor(
        task_id='wait_for_file',
        filepath='/tmp/data.csv',
        poke_interval=60,  # seconds between checks
        timeout=3600,       # timeout after 1 hour
    )
    
    # Wait for a specified amount of time
    time_sensor = TimeDeltaSensor(
        task_id='wait_1_hour',
        delta=timedelta(hours=1),
    )
    
    # Wait for an HTTP endpoint to return a 200
    http_sensor = HttpSensor(
        task_id='check_api',
        http_conn_id='http_default',
        endpoint='api/v1/status',
        response_check=lambda response: response.status_code == 200,
        poke_interval=60,
        timeout=3600,
    )

Sensor Modes

Sensors have different modes that affect how they consume resources:

  • poke (default): The sensor takes up a worker slot for its entire execution
  • reschedule: The sensor releases the worker slot between checks
  • smart: Switches between poke and reschedule based on resource availability
file_sensor = FileSensor(
    task_id='efficient_file_sensor',
    filepath='/tmp/data.csv',
    mode='reschedule',  # Release the worker between checks
    poke_interval=300,  # Check every 5 minutes
    dag=dag,
)

Hooks: Connecting to External Systems

Hooks provide a standardized interface to external systems:

from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
 
def hook_example(**context):
    # Get a connection by ID
    conn = BaseHook.get_connection('my_postgres_conn')
    print(f"Host: {conn.host}, Login: {conn.login}")
    
    # Use PostgreSQL hook
    pg_hook = PostgresHook(postgres_conn_id='my_postgres_conn')
    records = pg_hook.get_records("SELECT * FROM users LIMIT 5")
    
    # Use S3 hook
    s3_hook = S3Hook(aws_conn_id='my_aws_conn')
    file_content = s3_hook.read_key('my-bucket/data.csv')
    
    # Use HTTP hook
    http_hook = HttpHook(http_conn_id='my_api_conn', method='GET')
    response = http_hook.run('api/v1/data')

Creating Custom Hooks

You can create custom hooks to encapsulate complex connection logic:

from airflow.hooks.base import BaseHook
import requests
 
class MyAPIHook(BaseHook):
    """Hook to interact with a custom API."""
    
    def __init__(self, api_conn_id='my_api_default'):
        super().__init__()
        self.api_conn_id = api_conn_id
        self.connection = self.get_connection(api_conn_id)
        self.base_url = self.connection.host
        self.api_key = self.connection.password
    
    def get_data(self, endpoint, params=None):
        """Get data from the API."""
        url = f"{self.base_url}/{endpoint}"
        headers = {"Authorization": f"Bearer {self.api_key}"}
        
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        return response.json()
    
    def post_data(self, endpoint, data):
        """Post data to the API."""
        url = f"{self.base_url}/{endpoint}"
        headers = {"Authorization": f"Bearer {self.api_key}"}
        
        response = requests.post(url, headers=headers, json=data)
        response.raise_for_status()
        return response.json()

Using your custom hook:

def use_my_api(**context):
    hook = MyAPIHook(api_conn_id='my_api_conn')
    data = hook.get_data('users', params={'limit': 10})
    print(f"Retrieved {len(data)} users")

Dynamic DAG Generation

One of Airflow’s powerful features is the ability to generate DAGs dynamically:

Generating Tasks in a Loop

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
 
with DAG('dynamic_tasks', start_date=datetime(2023, 1, 1)) as dag:
    
    def process_data(data_id, **context):
        print(f"Processing data for ID: {data_id}")
    
    # Generate tasks dynamically
    tasks = []
    for i in range(1, 6):
        task = PythonOperator(
            task_id=f'process_data_{i}',
            python_callable=process_data,
            op_kwargs={'data_id': i},
        )
        tasks.append(task)
    
    # Set dependencies
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]

Generating DAGs from Configuration

You can generate multiple DAGs from external configuration:

import yaml
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
 
# Load configuration
with open('/path/to/dags_config.yaml') as f:
    config = yaml.safe_load(f)
 
# Create a DAG for each configuration
for dag_config in config['dags']:
    dag_id = dag_config['id']
    
    dag = DAG(
        dag_id=dag_id,
        schedule_interval=dag_config.get('schedule', '@daily'),
        start_date=datetime.fromisoformat(dag_config.get('start_date', '2023-01-01')),
        default_args=dag_config.get('default_args', {}),
    )
    
    # Add this DAG to the global namespace
    globals()[dag_id] = dag
    
    # Create tasks for this DAG
    for task_config in dag_config['tasks']:
        task = PythonOperator(
            task_id=task_config['id'],
            python_callable=globals()[task_config['function']],
            op_kwargs=task_config.get('kwargs', {}),
            dag=dag,
        )

Example configuration file:

dags:
  - id: data_processing_dag
    schedule: '0 0 * * *'
    start_date: '2023-01-01'
    tasks:
      - id: extract_data
        function: extract_data_function
        kwargs:
          source: 'db1'
      - id: transform_data
        function: transform_data_function
      - id: load_data
        function: load_data_function
        kwargs:
          destination: 'warehouse'
  
  - id: reporting_dag
    schedule: '0 6 * * *'
    tasks:
      - id: generate_report
        function: generate_report_function

Factory Functions for DAG Creation

A cleaner pattern is to use factory functions:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
 
def create_data_processing_dag(dag_id, schedule, source_system, target_system):
    """Factory function to create a DAG."""
    
    default_args = {
        'owner': 'data_engineering',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        schedule_interval=schedule,
        start_date=datetime(2023, 1, 1),
        catchup=False,
    )
    
    def extract(**context):
        print(f"Extracting data from {source_system}")
        return {"extracted_rows": 100}
    
    def transform(**context):
        ti = context['ti']
        extract_result = ti.xcom_pull(task_ids='extract')
        print(f"Transforming {extract_result['extracted_rows']} rows")
        return {"transformed_rows": extract_result['extracted_rows']}
    
    def load(**context):
        ti = context['ti']
        transform_result = ti.xcom_pull(task_ids='transform')
        print(f"Loading {transform_result['transformed_rows']} rows to {target_system}")
    
    with dag:
        extract_task = PythonOperator(
            task_id='extract',
            python_callable=extract,
        )
        
        transform_task = PythonOperator(
            task_id='transform',
            python_callable=transform,
        )
        
        load_task = PythonOperator(
            task_id='load',
            python_callable=load,
        )
        
        extract_task >> transform_task >> load_task
    
    return dag
 
# Create multiple DAGs
for source, target, schedule in [
    ('mysql', 'warehouse', '0 0 * * *'),
    ('postgres', 'warehouse', '0 6 * * *'),
    ('mongodb', 'warehouse', '0 12 * * *')
]:
    dag_id = f"{source}_to_{target}_dag"
    globals()[dag_id] = create_data_processing_dag(
        dag_id=dag_id,
        schedule=schedule,
        source_system=source,
        target_system=target
    )

Testing and Debugging

Testing DAGs is essential for reliable workflows. Here are several approaches:

Unit Testing Operator Logic

Test the functions that operators will call:

import unittest
from unittest.mock import patch
 
def process_data(value):
    """Function to be called by a PythonOperator."""
    if value < 0:
        raise ValueError("Value cannot be negative")
    return value * 2
 
class TestProcessData(unittest.TestCase):
    def test_process_data_positive(self):
        """Test process_data with positive input."""
        result = process_data(5)
        self.assertEqual(result, 10)
    
    def test_process_data_negative(self):
        """Test process_data with negative input."""
        with self.assertRaises(ValueError):
            process_data(-1)

Testing DAG Structure

Test that your DAG structure is correct:

import unittest
from airflow.models import DagBag
 
class TestDagValidation(unittest.TestCase):
    def setUp(self):
        self.dagbag = DagBag()
    
    def test_dags_load_without_errors(self):
        """Verify all DAGs can be imported without errors."""
        self.assertEqual(
            len(self.dagbag.import_errors),
            0,
            f"DAG import errors: {self.dagbag.import_errors}"
        )
    
    def test_dag_structure(self):
        """Test the structure of a specific DAG."""
        dag_id = 'example_dag'
        dag = self.dagbag.get_dag(dag_id)
        
        # Make sure the DAG exists
        self.assertIsNotNone(dag)
        
        # Test task count
        tasks = dag.tasks
        self.assertEqual(len(tasks), 3, f"Expected 3 tasks, found {len(tasks)}")
        
        # Test specific task existence
        task_ids = [task.task_id for task in tasks]
        self.assertIn('extract', task_ids)
        self.assertIn('transform', task_ids)
        self.assertIn('load', task_ids)
        
        # Test task dependencies
        upstream_task_ids = {
            task.task_id: [t.task_id for t in task.upstream_list] 
            for task in tasks
        }
        self.assertEqual(upstream_task_ids['extract'], [])
        self.assertEqual(upstream_task_ids['transform'], ['extract'])
        self.assertEqual(upstream_task_ids['load'], ['transform'])

Mocking External Dependencies

When testing tasks that interact with external systems, use mocks:

import unittest
from unittest.mock import patch, MagicMock
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
 
def fetch_data_from_api(api_endpoint, **context):
    """Function that calls an external API."""
    import requests
    response = requests.get(api_endpoint)
    response.raise_for_status()
    return response.json()
 
class TestApiTask(unittest.TestCase):
    @patch('requests.get')
    def test_fetch_data_from_api(self, mock_get):
        """Test the function that fetches API data."""
        # Configure the mock
        mock_response = MagicMock()
        mock_response.json.return_value = {'data': [1, 2, 3]}
        mock_response.raise_for_status.return_value = None
        mock_get.return_value = mock_response
        
        # Test the function
        result = fetch_data_from_api('https://example.com/api')
        
        # Assertions
        mock_get.assert_called_once_with('https://example.com/api')
        self.assertEqual(result, {'data': [1, 2, 3]})

Using the Airflow CLI for Testing

The Airflow CLI provides tools for testing:

# Test a specific task
airflow tasks test example_dag extract 2023-01-01
 
# Test DAG parsing
airflow dags list
 
# Backfill a date range to test full DAG execution
airflow dags backfill --start-date 2023-01-01 --end-date 2023-01-02 example_dag

Debugging with Airflow

Strategies for debugging Airflow DAGs:

  1. Local execution: Run tasks with the CLI to debug issues.
  2. Incremental development: Add and test tasks one by one.
  3. Use print statements: Include print statements in Python callables for visibility.
  4. Leverage Airflow logs: Check logs in the UI or on disk.
  5. XCom inspection: Use XComs to inspect intermediate values.

Advanced debugging example:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import logging
 
logger = logging.getLogger(__name__)
 
def debug_task(**context):
    """Task with extensive debugging."""
    logger.info("Task started")
    
    # Log the task context
    logger.info(f"Execution date: {context['ds']}")
    logger.info(f"Task instance: {context['ti']}")
    
    # Log available connections
    from airflow.hooks.base import BaseHook
    connections = BaseHook.get_connections()
    logger.info(f"Available connections: {[c.conn_id for c in connections]}")
    
    # Log environment variables
    import os
    logger.info(f"Environment: {dict(os.environ)}")
    
    # Log intermediate results
    result = some_calculation()
    logger.info(f"Intermediate result: {result}")
    
    # Push debugging info to XCom
    context['ti'].xcom_push(key='debug_info', value={'env': dict(os.environ), 'result': result})
    
    return result
 
with DAG('debug_dag', start_date=datetime(2023, 1, 1)) as dag:
    debug = PythonOperator(
        task_id='debug_task',
        python_callable=debug_task,
    )

Performance Optimization

As your Airflow deployment grows, performance optimization becomes important:

DAG Performance Best Practices

  1. Minimize DAG file parse time:
    • Avoid expensive imports at module level
    • Keep top-level code lightweight
    • Cache expensive computations
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
 
# BAD: Expensive operation at module level
# import pandas as pd
# df = pd.read_csv('large_file.csv')  # This runs every time Airflow parses the DAG!
 
# GOOD: Move expensive operations inside task functions
def process_data(**context):
    import pandas as pd  # Import only when task executes
    df = pd.read_csv('large_file.csv')
    # Process data...
    return df.shape[0]
 
with DAG('optimized_dag', start_date=datetime(2023, 1, 1)) as dag:
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
    )
  1. Optimize task scheduling:
    • Use appropriate depends_on_past settings
    • Set concurrency limits for DAGs and tasks
    • Use pools to manage resource contention
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
 
with DAG(
    'resource_optimized_dag',
    start_date=datetime(2023, 1, 1),
    # Prevent multiple concurrent runs of this DAG
    max_active_runs=1,
    # Configure default task concurrency within the DAG
    concurrency=5,
    default_args={
        # Only run if previous runs succeeded
        'depends_on_past': True,
        # Back off exponentially on retries
        'retry_exponential_backoff': True,
        'retries': 3,
        'retry_delay': timedelta(minutes=1),
    }
) as dag:
    
    # Resource-intensive task
    heavy_task = PythonOperator(
        task_id='heavy_processing',
        python_callable=lambda: None,
        # Assign to a resource pool
        pool='cpu_intensive_pool',
        # Set a higher priority (lower number = higher priority)
        priority_weight=10,
        # Set execution timeout
        execution_timeout=timedelta(hours=1),
    )
  1. Data transfer optimization:
    • Avoid large XComs
    • Use external storage for data transfer
    • Pass references (paths, IDs) instead of data
def extract(**context):
    """Extract data and save to a file."""
    data = fetch_large_dataset()
    file_path = f"/tmp/data_{context['ds_nodash']}.parquet"
    
    # Save to disk instead of using XCom
    data.to_parquet(file_path)
    
    # Push only the file path to XCom
    return {'file_path': file_path}
 
def transform(**context):
    """Transform using file path from previous task."""
    ti = context['ti']
    file_info = ti.xcom_pull(task_ids='extract')
    
    import pandas as pd
    data = pd.read_parquet(file_info['file_path'])
    # Transform data...

Executor Configuration

Choose the right executor for your workload:

  1. LocalExecutor: Good for small deployments
  2. CeleryExecutor: Distributed execution for scaling
  3. KubernetesExecutor: Dynamic scaling with Kubernetes

Example CeleryExecutor configuration:

# airflow.cfg
executor = CeleryExecutor
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16

Database Optimization

The metadata database can become a bottleneck:

  1. Regular maintenance:

    • Configure log cleanups
    • Set appropriate history retention
    • Run database vacuum regularly
  2. Indexing:

    • Add custom indexes for frequent queries
    • Monitor slow queries
-- Add index for task instance lookups
CREATE INDEX IF NOT EXISTS ti_dag_date ON task_instance (dag_id, execution_date);
 
-- Add index for task failure lookups
CREATE INDEX IF NOT EXISTS ti_state ON task_instance (state);
  1. Connection pooling:
    • Use PgBouncer for PostgreSQL
    • Tune connection pool settings

Security Best Practices

Securing your Airflow deployment is critical:

Secure Connections and Variables

Sensitive credentials should be securely stored:

from airflow.models import Connection, Variable
from airflow.hooks.base import BaseHook
 
# Create encrypted connections via UI or CLI
# airflow connections add 'my_db' --conn-uri 'postgresql://user:pass@host:port/db'
 
def secure_task(**context):
    # Retrieve securely stored connection
    conn = BaseHook.get_connection('my_db')
    print(f"Connected to {conn.host} as {conn.login}")
    
    # Retrieve securely stored variable
    api_key = Variable.get("api_key")
    # Use but don't print the key

Never hardcode credentials in DAG files!

Fernet Key for Encryption

Airflow uses Fernet for encrypting sensitive information:

# Generate a Fernet key
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
 
# Set in airflow.cfg
[core]
fernet_key = your_generated_key_here
 
# Or as an environment variable
export AIRFLOW__CORE__FERNET_KEY=your_generated_key_here

Role-Based Access Control (RBAC)

Configure user roles and permissions:

# Example roles in an RBAC setup
ROLES = {
    'Admin': {'permissions': ['can_create', 'can_read', 'can_edit', 'can_delete', 'can_pause']},
    'Editor': {'permissions': ['can_read', 'can_edit']},
    'Viewer': {'permissions': ['can_read']},
}
 
# Configure in airflow.cfg
[webserver]
rbac = True

Limiting Task Access

Restrict what tasks can do:

  1. Run as a different user:

    task = BashOperator(
        task_id='restricted_task',
        bash_command='whoami',
        run_as_user='airflow',
    )
  2. Use a Docker operator for isolation:

    from airflow.providers.docker.operators.docker import DockerOperator
     
    task = DockerOperator(
        task_id='dockerized_task',
        image='python:3.9-slim',
        command='python -c "print(\'Hello\')"',
        auto_remove=True,
        docker_url='unix://var/run/docker.sock',
    )
  3. Kubernetes Pod Operator for stronger isolation:

    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
     
    task = KubernetesPodOperator(
        task_id='k8s_task',
        name='pod-task',
        namespace='default',
        image='python:3.9-slim',
        cmds=["python", "-c"],
        arguments=["print('Hello')"],
        service_account_name='airflow',
        security_context={
            'runAsUser': 1000,
            'allowPrivilegeEscalation': False,
        },
    )

Network Security

Secure your Airflow components:

# In airflow.cfg
[webserver]
web_server_ssl_cert = /path/to/cert.pem
web_server_ssl_key = /path/to/key.pem
 
# Enable authentication
authenticate = True
auth_backend = airflow.api.auth.backend.basic_auth

Production Deployment

When deploying Airflow to production, consider these best practices:

Containerization with Docker

Using Docker provides consistency and isolation:

# Example Dockerfile for a custom Airflow image
FROM apache/airflow:2.7.1
 
USER root
# Install additional system dependencies
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
    build-essential \
    && apt-get clean && rm -rf /var/lib/apt/lists/*
 
USER airflow
# Install Python packages
COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt
 
# Copy custom DAGs, plugins, etc.
COPY --chown=airflow:root dags /opt/airflow/dags
COPY --chown=airflow:root plugins /opt/airflow/plugins

Kubernetes Deployment

For production-grade deployments, Kubernetes with the official Helm chart is recommended:

# Add the Airflow Helm repository
helm repo add apache-airflow https://airflow.apache.org
helm repo update
 
# Create a values file for customization
cat > custom-values.yaml << EOL
executor: CeleryExecutor
postgresql:
  enabled: true
redis:
  enabled: true
webserver:
  replicas: 2
scheduler:
  replicas: 2
workers:
  replicas: 3
dags:
  persistence:
    enabled: true
    size: 1Gi
logs:
  persistence:
    enabled: true
    size: 5Gi
EOL
 
# Install Airflow
helm install airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  -f custom-values.yaml

Continuous Deployment for DAGs

Implement CI/CD for your DAGs:

  1. Version control: Store DAGs in a Git repository
  2. CI checks: Run linting and tests on pull requests
  3. CD pipeline: Automatically deploy to production

Example GitHub Actions workflow:

name: Airflow DAG CI/CD
 
on:
  push:
    branches:
      - main
  pull_request:
    branches:
      - main
 
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install apache-airflow pytest pylint
          pip install -r requirements.txt
      
      - name: Lint with pylint
        run: |
          pylint --disable=C0111,C0103 dags/
      
      - name: Test with pytest
        run: |
          pytest tests/
  
  deploy:
    needs: test
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: us-east-1
      
      - name: Deploy DAGs to S3
        run: |
          aws s3 sync dags/ s3://my-airflow-dags-bucket/dags/

Monitoring and Alerting

Set up comprehensive monitoring:

  1. Metrics with Prometheus:

    # airflow.cfg
    [metrics]
    statsd_on = True
    statsd_host = localhost
    statsd_port = 8125
    statsd_prefix = airflow
  2. Visualization with Grafana:

    • Create dashboards for DAG performance
    • Track task successes/failures
    • Monitor system resources
  3. Alerting:

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.email import EmailOperator
    from datetime import datetime
     
    with DAG('monitored_dag', start_date=datetime(2023, 1, 1)) as dag:
        # Critical task
        task = PythonOperator(
            task_id='critical_task',
            python_callable=lambda: None,
            email_on_failure=True,
            email='alerts@example.com',
        )
        
        # Custom alerting logic
        alert = EmailOperator(
            task_id='alert_ops',
            to='ops@example.com',
            subject='DAG {{ dag.dag_id }} {{ dag_run.run_id }} - Issue detected',
            html_content='Task {{ ti.task_id }} has encountered a problem.',
            trigger_rule='one_failed',
        )
        
        task >> alert

Advanced Patterns

Take your Airflow skills to the next level with these advanced patterns:

Branching and Conditional Execution

Control task execution flow with branching:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
 
def _choose_path(**context):
    """Decide which path to take based on execution date."""
    if context['execution_date'].day % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'
 
with DAG('branching_dag', start_date=datetime(2023, 1, 1)) as dag:
    branch = BranchPythonOperator(
        task_id='branch_task',
        python_callable=_choose_path,
    )
    
    even_day = PythonOperator(
        task_id='even_day_task',
        python_callable=lambda: print("Even day processing"),
    )
    
    odd_day = PythonOperator(
        task_id='odd_day_task',
        python_callable=lambda: print("Odd day processing"),
    )
    
    join = PythonOperator(
        task_id='join_task',
        python_callable=lambda: print("Processing complete"),
        trigger_rule='one_success',  # Run after any upstream task succeeds
    )
    
    branch >> [even_day, odd_day] >> join

Task Groups for Organization

Organize related tasks together:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
 
with DAG('task_group_dag', start_date=datetime(2023, 1, 1)) as dag:
    start = PythonOperator(
        task_id='start',
        python_callable=lambda: print("Starting pipeline"),
    )
    
    # Create a task group for extract tasks
    with TaskGroup(group_id='extract_tasks') as extract_group:
        extract_a = PythonOperator(
            task_id='extract_a',
            python_callable=lambda: print("Extracting A"),
        )
        
        extract_b = PythonOperator(
            task_id='extract_b',
            python_callable=lambda: print("Extracting B"),
        )
    
    # Create a task group for transform tasks
    with TaskGroup(group_id='transform_tasks') as transform_group:
        transform_a = PythonOperator(
            task_id='transform_a',
            python_callable=lambda: print("Transforming A"),
        )
        
        transform_b = PythonOperator(
            task_id='transform_b',
            python_callable=lambda: print("Transforming B"),
        )
    
    load = PythonOperator(
        task_id='load',
        python_callable=lambda: print("Loading data"),
    )
    
    # Set dependencies
    start >> extract_group >> transform_group >> load
    
    # You can also set dependencies within groups
    # extract_a >> transform_a
    # extract_b >> transform_b

Custom Operators

Create reusable task components:

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
 
class DataQualityOperator(BaseOperator):
    """
    Custom operator for data quality checks.
    """
    
    @apply_defaults
    def __init__(
        self,
        conn_id,
        table,
        checks,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.table = table
        self.checks = checks
    
    def execute(self, context):
        """Execute data quality checks."""
        self.log.info(f"Running data quality checks on {self.table}")
        
        # Use a hook to connect to the database
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id=self.conn_id)
        
        failed_tests = []
        for check in self.checks:
            # Each check is a dictionary with query and expected result
            query = check['query']
            expected = check['expected']
            
            self.log.info(f"Running check: {query}")
            records = hook.get_records(query)
            
            if not records or not records[0]:
                failed_tests.append(f"Check returned no results: {query}")
                continue
            
            actual = records[0][0]
            self.log.info(f"Check result: {actual}")
            
            if actual != expected:
                failed_tests.append(f"Check failed: {query}, expected {expected}, got {actual}")
        
        if failed_tests:
            self.log.error("Data quality checks failed!")
            self.log.error("\n".join(failed_tests))
            raise ValueError("Data quality checks failed")
        
        self.log.info("All data quality checks passed!")

Using the custom operator:

from airflow import DAG
from datetime import datetime
 
with DAG('data_quality_dag', start_date=datetime(2023, 1, 1)) as dag:
    dq_check = DataQualityOperator(
        task_id='check_users_table',
        conn_id='my_postgres',
        table='users',
        checks=[
            {
                'query': "SELECT COUNT(*) FROM users WHERE email IS NULL",
                'expected': 0
            },
            {
                'query': "SELECT COUNT(*) FROM users",
                'expected': lambda x: x > 0  # Can also use a function
            }
        ],
    )

Subdag Pattern (Legacy)

The SubDAG pattern is now considered legacy and TaskGroups are preferred.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
 
def load_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):
    """Creates a subdag for loading data."""
    dag = DAG(
        dag_id=f'{parent_dag_id}.{child_dag_id}',
        start_date=start_date,
        schedule_interval=schedule_interval,
    )
    
    with dag:
        load_a = PythonOperator(
            task_id='load_a',
            python_callable=lambda: print("Loading A"),
        )
        
        load_b = PythonOperator(
            task_id='load_b',
            python_callable=lambda: print("Loading B"),
        )
    
    return dag
 
with DAG('main_dag', start_date=datetime(2023, 1, 1)) as dag:
    extract = PythonOperator(
        task_id='extract',
        python_callable=lambda: print("Extracting data"),
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=lambda: print("Transforming data"),
    )
    
    load = SubDagOperator(
        task_id='load',
        subdag=load_subdag(
            parent_dag_id='main_dag',
            child_dag_id='load',
            start_date=dag.start_date,
            schedule_interval=dag.schedule_interval,
        ),
    )
    
    extract >> transform >> load

Triggers

Create event-based workflows:

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
 
# DAG 1: Triggers DAG 2 on completion
with DAG('trigger_dag', start_date=datetime(2023, 1, 1)) as dag1:
    task = PythonOperator(
        task_id='process_data',
        python_callable=lambda: print("Processing data"),
    )
    
    trigger = TriggerDagRunOperator(
        task_id='trigger_reporting',
        trigger_dag_id='reporting_dag',
        execution_date='{{ ds }}',  # Pass the same execution date
        reset_dag_run=True,         # If the target DAG run exists, reset it
        wait_for_completion=True,   # Wait for the triggered DAG to complete
    )
    
    task >> trigger
 
# DAG 2: The reporting DAG
with DAG('reporting_dag', start_date=datetime(2023, 1, 1)) as dag2:
    # Wait for the first DAG to complete
    wait = ExternalTaskSensor(
        task_id='wait_for_processing',
        external_dag_id='trigger_dag',
        external_task_id='process_data',
        execution_date_fn=lambda dt: dt,  # Use the same execution date
        mode='reschedule',
        timeout=3600,
    )
    
    report = PythonOperator(
        task_id='generate_report',
        python_callable=lambda: print("Generating report"),
    )
    
    wait >> report

Troubleshooting Common Issues

When things go wrong, use these troubleshooting steps:

DAG Not Appearing in UI

  1. Check file permissions:

    ls -la $AIRFLOW_HOME/dags/
  2. Verify DAG file syntax:

    python -c "import your_dag_file"
  3. Check the scheduler logs:

    cat $AIRFLOW_HOME/logs/scheduler/*.log | grep "your_dag_id"
  4. Verify DAG directory configuration:

    grep "dags_folder" $AIRFLOW_HOME/airflow.cfg

Tasks Not Running

  1. Check task dependencies:

    • Verify that upstream tasks are completing
    • Look for depends_on_past=True settings
    • Check for missing external dependencies
  2. Verify pool availability:

    • Check if the task uses a pool and if slots are available
    • Adjust pool sizes if needed
  3. Check concurrency settings:

    # airflow.cfg
    [core]
    parallelism = 32  # Max total tasks across all DAGs
    max_active_tasks_per_dag = 16  # Max concurrent tasks per DAG

Failed Tasks

  1. Check task logs in the UI or on disk:

    cat $AIRFLOW_HOME/logs/dag_id/task_id/execution_date/*.log
  2. Re-run with debug info:

    # Increase log level
    export AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG
     
    # Re-run the task
    airflow tasks test dag_id task_id execution_date
  3. Common failure causes:

    • Connectivity issues to external systems
    • Missing dependencies
    • Resource constraints
    • Timeouts
    • Permissions problems

Performance Issues

  1. Check database health:

    -- For PostgreSQL
    SELECT state, COUNT(*) 
    FROM task_instance 
    GROUP BY state;
     
    -- Check for long-running queries
    SELECT pid, now() - pg_stat_activity.query_start AS duration, query
    FROM pg_stat_activity
    WHERE pg_stat_activity.query != '<IDLE>'
    ORDER BY duration DESC;
  2. Monitor resource usage:

    # Check worker resources
    htop
     
    # Check disk usage
    df -h
     
    # Check log size
    du -sh $AIRFLOW_HOME/logs/
  3. Optimize DAG parsing:

    • Use the new DAG serialization feature
    • Implement minimum file processing interval
    • Create smaller, focused DAG files

Resources and Further Learning

Official Documentation

Community Resources

Books

  • “Data Pipelines with Apache Airflow” by Bas Harenslak and Julian de Ruiter
  • “Fundamentals of Data Engineering” by Joe Reis and Matt Housley

Courses and Tutorials

Blogs and Articles

Join the Airflow community and contribute! Open source projects thrive on community involvement, whether through code contributions, documentation, or simply helping others in forums.


Conclusion

Apache Airflow provides a powerful platform for building and managing data pipelines. By understanding its architecture, concepts, and best practices, you can create robust, maintainable, and scalable workflows.

Key takeaways from this guide:

  1. DAGs are central to Airflow, providing a clear representation of workflow dependencies
  2. Python-based definition gives flexibility and power in workflow creation
  3. Operators encapsulate task logic, making workflows modular and reusable
  4. Testing is essential for reliable pipeline operation
  5. Production deployment requires careful consideration of security, scaling, and monitoring

As data pipelines grow in complexity, Airflow’s programmatic approach and rich ecosystem of integrations provide the tools needed to maintain control and visibility over your workflows.

We hope this guide helps you on your journey with Apache Airflow. Happy orchestrating!