Apache Airflow: The Go-To Library for Pipeline Orchestration in Python
Apache Airflow has rapidly established itself as the go-to library for pipeline orchestration within the Python ecosystem. Its rise to prominence can be attributed to its simplicity, extensibility, and robust feature set, which make it a preferred choice over similar solutions. In this article, we will explore the core concepts of Apache Airflow, its key components, and practical use cases, providing you with a comprehensive understanding of when and how to leverage this powerful tool.
Why and When Should I Consider Airflow?
Imagine you are tasked with building a machine learning pipeline that involves several steps, such as:
- Reading an image dataset from cloud-based storage
- Processing the images
- Training a deep learning model with the processed images
- Uploading the trained model to the cloud
- Deploying the model
While Cron jobs may seem like a straightforward solution for scheduling these tasks, they come with significant limitations, particularly in terms of scalability and error handling. Airflow, on the other hand, offers a robust framework for scheduling and managing complex workflows. It allows you to:
- Automatically re-run failed tasks
- Manage task dependencies
- Monitor workflows through logs and dashboards
Before diving into the specifics of building a machine learning pipeline with Airflow, let’s first understand its foundational concepts.
What is Airflow?
Apache Airflow is an open-source tool designed for authoring, scheduling, and monitoring data pipelines. It is particularly well-suited for ETL (Extract, Transform, Load) processes and MLOps (Machine Learning Operations) use cases. Common applications of Airflow include:
- Extracting data from various sources, transforming it, and storing it in a data warehouse
- Generating insights from data and displaying them in analytics dashboards
- Training, validating, and deploying machine learning models
Key Components of Airflow
When you install Airflow, you will encounter four primary components:
-
Webserver: The user interface (UI) of Airflow, which allows users to interact with the system without needing to use the command line or API. The webserver enables users to execute and monitor pipelines, create connections with external systems, and inspect datasets.
-
Executor: Executors are responsible for running the tasks defined in your pipelines. Airflow supports various executors, including LocalExecutor, SequentialExecutor, CeleryExecutor, and KubernetesExecutor, allowing you to run tasks on a single machine or in a distributed environment.
-
Scheduler: The scheduler is the heart of Airflow, responsible for executing tasks at the right time, re-running failed tasks, backfilling data, and ensuring task completion.
- PostgreSQL: Airflow uses a database (typically PostgreSQL) to store metadata about the pipelines, including task states and execution history.
The easiest way to install Airflow is through Docker Compose. You can download the official Docker Compose file with the following command:
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
Airflow is also available on PyPI and can be installed using pip.
Basic Concepts of Airflow
To effectively use Airflow, it’s essential to understand its core concepts, which can initially seem complex. Let’s break them down.
DAGs (Directed Acyclic Graphs)
In Airflow, all workflows are defined as Directed Acyclic Graphs (DAGs). Each time a DAG is executed, a unique run is created, allowing multiple instances of the same DAG to run in parallel.
To instantiate a DAG, you can use the following code:
from airflow import DAG
from datetime import datetime, timedelta
with DAG(
"mlops",
default_args={
"retries": 1,
},
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1)
) as dag:
pass
Tasks
Each node in a DAG represents a task, which is an individual piece of code. Tasks can have upstream and downstream dependencies, indicating the order in which they should be executed. When a new DAG run is initialized, all tasks are instantiated as Task instances.
Operators
Operators are templates for predefined tasks, encapsulating boilerplate code and abstracting much of the underlying logic. Common operators include BashOperator
, PythonOperator
, MySqlOperator
, and S3FileTransformOperator
.
Here’s an example of defining tasks using operators:
with DAG("tutorial") as dag:
task1 = BashOperator(
task_id="print_date",
bash_command="date",
)
task2 = MySqlOperator(
task_id="load_table",
sql="/scripts/load_table.sql"
)
Task Dependencies
Defining dependencies between tasks is crucial for structuring your DAG. You can use the >>
operator to set dependencies:
task1 >> task2 >> task3
Alternatively, you can use the set_downstream
and set_upstream
methods:
task1.set_downstream([task2, task3])
XComs (Cross-Communications)
XComs are used for communication between tasks, allowing them to push and pull data. However, there is a limit to the amount of data that can be passed through XComs, so for larger datasets, consider using external storage solutions.
Here’s an example of using XComs:
dag = DAG('mlops_dag')
def train_model(ti):
model_path = train_and_save_model()
ti.xcom_push(key='model_path', value=model_path)
def deploy_model(ti):
model_path = ti.xcom_pull(key='model_path', task_ids='train_model')
deploy_trained_model(model_path)
train_model_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
deploy_model_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag
)
train_model_task >> deploy_model_task
Taskflow API
The Taskflow API simplifies task definition using Python decorators. If a task’s logic can be expressed in Python, you can define it with a simple annotation. This API also manages dependencies and communications between tasks automatically.
Here’s an example using the Taskflow API:
from airflow.decorators import dag, task
@dag(start_date=datetime(2023, 1, 1), schedule_interval='@daily')
def mlops():
@task
def load_data():
return df
@task
def preprocessing(data):
return data
@task
def fit(data):
return None
df = load_data()
data = preprocessing(df)
model = fit(data)
dag = mlops()
Scheduling
Airflow’s scheduling capabilities are one of its core features. You can define the schedule using the schedule_interval
argument, which accepts cron expressions, datetime.timedelta
objects, or predefined presets like @hourly
or @daily
.
Two important concepts related to scheduling are backfilling and catchup. Backfilling allows you to create past runs from the command line, while the catchup parameter determines whether Airflow should create DAG runs for all intervals from the start date to the current date.
Connections and Hooks
Airflow makes it easy to configure connections with external systems. Connections can be created through the UI, environment variables, or configuration files. Hooks are APIs that abstract communication with these external systems.
For example, you can define a PostgreSQL connection and use the PostgresHook
to execute queries:
pg_hook = PostgresHook(postgres_conn_id='postgres_custom')
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('CREATE TABLE mytable (ModelID INT, ModelName VARCHAR(255))')
cursor.close()
conn.close()
Advanced Concepts
To master Airflow, it’s beneficial to explore some advanced concepts:
- Branching: Allows you to split a task into multiple paths based on conditions, often using
BranchPythonOperator
. - Task Groups: Help organize tasks into a single unit, simplifying the graph view.
- Dynamic DAGs: DAGs and tasks can be created dynamically at runtime, ideal for parallel and input-dependent tasks.
- Unit Testing and Logging: Airflow provides functionality for running unit tests and logging information.
Airflow Best Practices
Before diving into a hands-on example, here are some best practices to consider:
- Idempotency: Ensure that DAGs and tasks are idempotent; re-executing the same run should yield the same result.
- Atomicity: Each task should perform a single operation and be independent of others.
- Incremental Filtering: Process only a batch of data in each run to minimize the impact of failures.
- Top-Level Code: Avoid top-level code outside of tasks to improve performance.
- Simplicity: Keep DAGs as simple as possible to enhance performance and maintainability.
Example of an Airflow Pipeline
To illustrate the concepts discussed, let’s build a pipeline that trains a model and deploys it in Kubernetes. This pipeline will consist of five tasks:
- Read images from an AWS S3 bucket
- Preprocess the images using PyTorch
- Fine-tune a ResNet model with the downloaded images
- Upload the model to S3
- Deploy the model in a Kubernetes cluster
Here’s how to define the DAG:
from airflow import DAG
import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 1,
'retry_delay': datetime.timedelta(hours=1),
}
dag = DAG(
'resnet_model',
default_args=default_args,
description='A simple DAG to demonstrate Airflow with PyTorch and Kubernetes',
schedule_interval='@daily',
catchup=False
)
Task Definitions
- Read Images from S3:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def read_images_from_s3(**kwargs):
s3_conn = S3Hook(aws_conn_id='aws_default')
images = [obj.key for obj in s3_conn.get_bucket('mybucket').objects.all()]
kwargs['ti'].xcom_push(key='images', value=images)
read_images = PythonOperator(
task_id='read_images',
python_callable=read_images_from_s3,
provide_context=True,
dag=dag
)
- Preprocess Images:
def preprocess_images(**kwargs):
images = kwargs['ti'].xcom_pull(task_ids='read_images', key='images')
# Preprocess images logic here
kwargs['ti'].xcom_push(key='train_images', value=train_images)
preprocess = PythonOperator(
task_id='preprocess',
python_callable=preprocess_images,
provide_context=True,
dag=dag
)
- Fit Model:
def fit_model(**kwargs):
train_images = kwargs['ti'].xcom_pull(task_ids='preprocess', key='train_images')
# Model training logic here
torch.save(model, 'trained_model.pt')
fit_model_task = PythonOperator(
task_id='fit_model',
python_callable=fit_model,
provide_context=True,
dag=dag
)
- Upload Model to S3:
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
upload_model = S3FileTransformOperator(
task_id='upload_model',
source_base_path='.',
source_key='trained_model.pt',
dest_s3_bucket='my-model-bucket',
dest_s3_key='trained_model.pt',
dag=dag
)
- Deploy Model in Kubernetes:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
deploy_model = KubernetesPodOperator(
namespace='default',
image='myimage:latest',
name='deploy-model',
task_id='deploy_model',
cmds=['python', 'deploy.py'],
arguments=[model],
is_delete_operator_pod=True,
hostnetwork=False,
dag=dag
)
Setting Task Dependencies
Finally, set the dependencies to form the DAG:
read_images >> preprocess >> fit_model_task >> upload_model >> deploy_model
This DAG will be initialized by Airflow and can be monitored through the UI. The scheduler will handle executing the tasks in the correct order and at the appropriate times.
Conclusion
Apache Airflow is an invaluable tool for data engineering and machine learning workflows. Its flexibility and scalability make it a strong choice for orchestrating complex pipelines. While it has its challenges, understanding its core concepts and best practices will empower you to leverage Airflow effectively.
If you’re interested in deepening your knowledge of Airflow, consider exploring the following resources:
- ETL and Data Pipelines with Shell, Airflow, and Kafka – A course by IBM on Coursera.
- Data Engineering with AWS Nanodegree – A program from AWS in Udacity focusing heavily on Airflow.
Stay connected with us for more tutorials on popular data engineering libraries, and don’t forget to follow us on X or LinkedIn for the latest updates.
Deep Learning in Production Book 📖
Learn how to build, train, deploy, scale, and maintain deep learning models. Understand ML infrastructure and MLOps using hands-on examples. Learn more.
Disclosure: Some links above may be affiliate links, and at no additional cost to you, we may earn a commission if you decide to make a purchase after clicking through.