Apache Airflow’s “pinwheel” logo

Unleashing the power of TaskFlow API in Apache Airflow

Ferruzzi
Apache Airflow

--

Apache Airflow is an orchestration platform to programmatically author, schedule, and execute workflows. Workflows are built by chaining together Operators, building blocks that perform individual tasks, into a Directed Acyclic Graph (DAG). The TaskFlow API was introduced in Airflow 2.0 and is a wonderful alternative to PythonOperator. We are going to have a look at a few use cases where TaskFlow excels and see how it compares to writing a DAG using the traditional PythonOperator. If you are unfamiliar with the basics of writing a DAG in Apache Airflow 2, you may want to check out the Airflow Fundamental Concepts review on the Airflow Tutorials page or this Apache Airflow 2.0 Tutorial blog post to get the most of this blog post.

If you have used the PythonOperator with Airflow in the past, you will likely be wondering why change now? The TaskFlow API offers a number of benefits over the traditional PythonOperator. Some of the most obvious are:

  1. Reduced boilerplate code
  2. Intuitive data transfer between DAGs
  3. No unnecessary code for explicit dependency chain
  4. Simplified task instantiation

Side-by-Side Comparison

A note on code snippets: The code snippets below work in Airflow 2.4.0+. Older versions of Airflow may require some adjustments such as replacing schedule=None with schedule_interval=None in the DAG declarations.

Let’s use this sample Extract, Transform, and Load (ETL) DAG, which uses the traditional PythonOperator as our starting point.

import json
from datetime import datetime

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator

def extract(ti=None, **kwargs):
"""
Pushes the estimated population (in millions) of
various cities into XCom for the ETL pipeline.
Obviously in reality this would be fetching this
data from some source, not hardcoded values.
"""
sample_data = {"Tokyo": 3.7, "Jakarta": 3.3, "Delhi": 2.9}
ti.xcom_push("city_populations", json.dumps(sample_data))


def transform(ti=None, **kwargs):
"""
Pulls the provided raw data from XCom and pushes
the name of the largest city in the set to XCom.
"""
raw_data = ti.xcom_pull(task_ids="extract", key="city_populations")
data = json.loads(raw_data)

largest_city = max(data, key=data.get)
ti.xcom_push("largest_city", largest_city)


def load(ti=None, **kwargs):
"""
Loads and prints the name of the largest city in
the set as determined by the transform.
"""
largest_city = ti.xcom_pull(task_ids="transform", key="largest_city")

print(largest_city)


with DAG(
dag_id="city_pop_etl_pythonoperator",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:

extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)

transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)

load_task = PythonOperator(
task_id="load",
python_callable=load,
)

chain(
extract_task,
transform_task,
load_task,
)

Admittedly, the DAG above is oversimplified for a real ETL job, but it is fully functional — if you copy and paste it into your Airflow environment, it will run — and it should be enough of a starting point to show what TaskFlow can do for you. We simulate fetching a dictionary containing city names and their respective population in extract_task, pushing the name of the city with the largest population from the set into XCom in transform_task, then pull the value from XCom and print it to the task log in load_task.

Next, we will look at the same DAG rewritten to make use of TaskFlow methods.

import json
from datetime import datetime

from airflow import DAG
from airflow.decorators import task

@task
def extract():
"""
Pushes the estimated population (in millions) of
various cities into XCom for the ETL pipeline.
Obviously in reality this would be fetching this
data from some source, not hardcoded values.
"""
sample_data = {"Tokyo": 3.7, "Jakarta": 3.3, "Delhi": 2.9}
return json.dumps(sample_data)


@task
def transform(raw_data: str):
"""
Loads the provided raw data from XCom and pushes
the name of the largest city in the set to XCom.
"""
data = json.loads(raw_data)

largest_city = max(data, key=data.get)
return largest_city


@task
def load(largest_city):
"""
Prints the name of the largest city in
the set as determined by the transform.
"""
print(largest_city)


with DAG(
dag_id="city_pop_etl_taskflow",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:

extracted_data = extract()
largest_city = transform(extracted_data)
load(largest_city)
A side by side diff between the last two code snippets.

Both of these DAGs perform the same task, but there are a few differences you will notice immediately. One thing that jumps out when viewed side by side is how much repetitive code has been trimmed out. The DAG on the right which uses TaskFlow is 28% shorter! The XCom interactions have been completely abstracted away as well, making the remaining code much easier to understand. Writing a python task looks and feels like “normal python” now. The dependency chain is now constructed implicitly based on how the tasks create and consume inputs and outputs, so the explicit declaration at the end has also been abstracted away. If you load both of these DAGs into Airflow, their dependency graphs are identical. The TaskFlow API is smart enough to determine the dependency chain based on which values are being passed to future tasks. You might also notice that each of the PythonOperators in the DAG on the left declared a task_id and those are gone now. The @task decorator assigns the name of the method it decorates as the task_id by default. In the code snippet below, both of the tasks are functionally identical. The task_id=”cleanup” is implied in the first one and handled behind the scenes.

@task
def cleanup():
...

@task(task_id="cleanup")
def cleanup():
...

You can overwrite that and include other BaseOperator parameters in the task decorator if you wish. For example:

@task(task_id="teardown_task", trigger_rule=TriggerRule.ALL_DONE)
def cleanup():
...

As mentioned above, from this simple example we can already see more concise and simplified code.

Multiple XCom Values

What about returning multiple values? What if I wanted transform() to return both the city name and the population? With the old method it is possible to add one or more ti.xcom_push() like this:

def transform(ti=None, **kwargs):
raw_data = ti.xcom_pull(task_ids="extract", key="city_populations")
data = json.loads(raw_data)

largest_city = max(data, key=data.get)
ti.xcom_push("largest_city_name", largest_city)
ti.xcom_push("largest_city_population", data[largest_city])

def load(ti=None, **kwargs):
city = ti.xcom_pull(task_ids="transform", key="largest_city_name")
pop = ti.xcom_pull(task_ids="transform", key="largest_city_population")

print(f"City: {city}\tPopulation: {pop}")

With TaskFlow you can just return the values in any json-serializable form. You might use a dictionary:

@task
def transform(raw_data: str):
data = json.loads(raw_data)

largest_city = max(data, key=data.get)
return {"name": largest_city, "pop": data[largest_city]}

@task
def load(largest_city):
print(f"City: {largest_city['name']}\tPop: {largest_city['pop']}")

Or maybe a tuple suits your needs?

@task
def transform(raw_data: str):
data = json.loads(raw_data)

largest_city = max(data, key=data.get)
return largest_city, data[largest_city]

@task
def load(largest_city):
name, pop = largest_city
print(f"City: {name}\tPop: {pop}")

Combining TaskFlow methods with Operators

You can even use similar notation between TaskFlow methods and traditional Operators, with some modifications. The following DAG combines traditional Operators and TaskFlow methods as an example.

import json
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateObjectOperator,
S3ListOperator,
)


@task
def extract(s3_key_list: list):
"""
Retrieves data from S3 and pushes it into XCom.
"""
# This is somewhat contrived for the sake of the
# demo, but read a json string from the first S3
# object in the provided list and return it.
return S3Hook().read_key(key=s3_key_list[0], bucket_name="bucket_name")


@task
def transform(raw_data: str):
"""
Loads the provided raw data from XCom and pushes the name
and population of the largest city in the set to XCom.
"""
data = json.loads(raw_data)

largest_city = max(data, key=data.get)
return largest_city


@task
def load(largest_city):
"""
Prints the name of the largest city in
the set as determined by the transform.
"""
name = largest_city
print(f"Largest City: {name}")


with DAG(
dag_id="city_pop_etl_hybrid",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:

list_files = S3ListOperator(
task_id="list_files",
bucket="bucket_name",
)
extracted_data = extract(list_files.output)
largest = transform(extracted_data)
store_results = S3CreateObjectOperator(
task_id="store_results",
s3_bucket="bucket_name",
s3_key="saved_results",
data=largest,
)

chain(
list_files,
extracted_data,
largest,
load(largest),
store_results,
)

There are a few things to note here. Unfortunately, the requirement to explicitly define the dependency chain is back. Airflow can determine the dependency chain between TaskFlow methods, but not when a traditional Operator enters the fray. However, getting the XCom values to and from those traditional Operators is much easier. We can get the output of a traditional Operator using the task_id.output notation as seen in extracted_data = extract(list_files.output). TaskFlow outputs can be sent into the Operator directly, as seen in the S3CreateObjectOperator’s “data” parameter. We are explicitly casting it to a string because largest() is returning a tuple, but we don’t have to explicitly fetch it from XCom like we did in our very first example; if largest() returned a string it could be used without casting it. You may also note that a TaskFlow method can be used directly in the chain() definition without having to previously define it as a task, as we did here with load(largest). A key thing to note here is that when you are sending the value into a traditional Operator, this only works for the parameters listed in the Operator’s template fields (example). If the parameter is not listed in the Operator’s template fields then you still have to use the old xcom.pull() technique.

Summary

If you are using Python to perform tasks in your workflows, TaskFlow methods make a great replacement for the PythonOperator. They reduce boilerplate code and simplify your DAGs, while still being able to interact with your other Operators.

Still have questions? Join the conversation over on the Airflow community slack server!

--

--