Apache Airflow Tutorial – Part 4 DAG Patterns

apache airflow distributed computing docker job queues python Mar 21, 2019

Overview

During the previous parts in this series, I introduced Apache Airflow in general, demonstrated my docker dev stack, and built out a simple linear DAG definition. I want to wrap up the series by showing a few other common DAG patterns I regularly use.

In order to follow along, get the source code!

Bring up your Airflow Development Environment

unzip airflow-template.zip
cd airflow-template
docker-compose up -d
docker-compose logs airflow_webserver
Bash

This will take a few minutes to get everything initialized, but once its up you will see something like this:

DAG Patterns

I use 3 main DAG patterns. Simple, shown in Part 3, Linear, and Gather. Of course, once you master these patterns, you an combine them to make much more complex pipelines.

Simple DAG Pattern

What I call a simple pattern (and I have no idea if any of these patterns have official names) is a chain of tasks where each task depends upon the previous task. In this case make_icecream_sundae_tasks depends upon choose_toppings_task which depends upon choose_icecream_flavor_task, and finally choose_icecream_flavor_task depends upon choose_cone_task.

If you read the Part 3 in the series, you will have seen this dependency list written out as:

choose_icecream_flavor_op.set_upstream(choose_cone_op)
choose_toppings_op.set_upstream(choose_icecream_flavor_op)
make_icecream_sundae_op.set_upstream(choose_toppings_op)
Python

Linear DAG Pattern

Now, let's say that I have 2 lists of tasks: taskA_1, taskA_2, taskA_2 and taskB_1, taskB_2, taskB_3, where taskB_1 depends upon taskA_1, taskB_2 depends on taskA_2, and taskB_3 depends on taskA_3.

In keeping with my ice cream theme I've created a list of ice cream tasks. This example is a bit silly, because we could just as well go with the first example and simply throw more workers or cpus at it and get the same result, but for the interest of instruction just go with it.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
import random
from pprint import pprint
from icecream_sunday_dag_def import choose_icecream_flavor, choose_cone, choose_toppings, make_icecream_sundae, \
default_args
icecream_sundae_linear_dag = DAG('ice_cream_sundae_linear', default_args=default_args, schedule_interval=None)
"""
Linear Ice Cream Sundae DAG Def
"""
def generate_choose_cone_op(dag, task_id):
return PythonOperator(
task_id='choose_cone_task_{}'.format(task_id),
python_callable=choose_cone,
provide_context=True,
dag=dag
)
def generate_choose_icecream_flavor_op(dag, task_id):
return PythonOperator(
task_id='choose_icecream_flavor_task_{}'.format(task_id),
provide_context=True,
dag=dag,
python_callable=choose_icecream_flavor,
)
def generate_choose_toppings_op(dag, task_id):
return PythonOperator(
task_id='choose_toppings_task_{}'.format(task_id),
provide_context=True,
dag=dag,
python_callable=choose_toppings,
)
def generate_make_icecream_sundae_op(dag, task_id):
return PythonOperator(
task_id='make_icecream_sundae_task_{}'.format(task_id),
provide_context=True,
dag=dag,
python_callable=make_icecream_sundae,
)
choose_cones_op_list = [generate_choose_cone_op(icecream_sundae_linear_dag, 1),
generate_choose_cone_op(icecream_sundae_linear_dag, 2)]
choose_flavors_op_list = [generate_choose_icecream_flavor_op(icecream_sundae_linear_dag, 1),
generate_choose_icecream_flavor_op(icecream_sundae_linear_dag, 2)]
choose_toppings_op_list = [generate_choose_toppings_op(icecream_sundae_linear_dag, 1),
generate_choose_toppings_op(icecream_sundae_linear_dag, 2)]
for index, choose_cone_OP in enumerate(choose_cones_op_list):
choose_cone_OP.set_downstream(choose_flavors_op_list[index])
choose_flavors_op_list[index].set_downstream(choose_toppings_op_list[index])

You will see that for this example instead of directly declaring my operator instance I instead wrapped it in a function that returns an instance of an operator. This is a much more flexible way of declaring your operators and I recommend using it. You can even pass the DAG in as a parameter of your function, allowing you to reuse operators across DAGs if that is what you are going for.

If you open up your browser at http://localhost:8089/admin/airflow/graph?dag_id=ice_cream_sundae_linear&execution_date= you will see the linear DAG graph. 

Gather DAG Pattern

This last pattern is similar to the previous, but instead of having completely separate task paths, instead you have a single task that depends upon a grouping of other tasks. The beauty of using Airflow to do this is that assuming you have enough computational power, choose_cone_task_1, choose_flavor_task_1 and choose_toppings_task_1 will all run in parallel, at the same time. NEAT!

Open up http://localhost:8089/admin/airflow/graph?dag_id=ice_cream_sundae_gather_dag&execution_date= to see this DAG layout.

And here is the code for our final DAG pattern.

 

from airflow import DAG
from icecream_sunday_dag_def import default_args
from icecream_sundae_linear_dag_def import generate_choose_cone_op, generate_choose_toppings_op, \
generate_choose_icecream_flavor_op, generate_make_icecream_sundae_op
icecream_sundae_gather_dag = DAG('ice_cream_sundae_gather_dag', default_args=default_args, schedule_interval=None)
choose_cone_op = generate_choose_cone_op(icecream_sundae_gather_dag, 1)
choose_icecream_flavor_op = generate_choose_icecream_flavor_op(icecream_sundae_gather_dag, 1)
choose_toppings_op = generate_choose_toppings_op(icecream_sundae_gather_dag, 1)
make_icecream_sundae_op = generate_make_icecream_sundae_op(icecream_sundae_gather_dag, 1)
make_icecream_sundae_op.set_upstream([choose_cone_op, choose_icecream_flavor_op, choose_toppings_op])

Wrap Up

That's it. I hope you found this series helpful, and demonstrated how you can use different DAG patterns to piece as building blocks to more complex functionality.

For more Airflow fun checkout this great curated list of Airflow related blog posts and resources! Awesome Apache Airflow.

Bioinformatics Solutions on AWS Newsletter 

Get the first 3 chapters of my book, Bioinformatics Solutions on AWS, as well as weekly updates on the world of Bioinformatics and Cloud Computing, completely free, by filling out the form next to this text.

Bioinformatics Solutions on AWS

If you'd like to learn more about AWS and how it relates to the future of Bioinformatics, sign up here.

      We won't send spam. Unsubscribe at any time.