AWS Big Data Blog

Orchestrate AWS Glue DataBrew jobs using Amazon Managed Workflows for Apache Airflow

As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Analysts are building complex data transformation pipelines that include multiple steps for data preparation and cleansing. However, analysts may want a simpler orchestration mechanism with a graphical user interface that can scale for larger data volume and is easy to maintain. To support these requirements, you can use AWS Glue DataBrew for data preparation and Amazon Managed Workflows for Apache Airflow (Amazon MWAA) for orchestrating workflows. In this post, we discuss configuring this integration.

Glue DataBrew is a new visual data preparation tool that helps you clean and normalize data without writing code. Analysts can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values. Glue DataBrew is serverless, which helps analysts focus on exploring and transforming terabytes of raw data without needing to create clusters or manage any infrastructure. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

Amazon MWAA is a fully managed service that makes it easy to run the open-source version of Apache Airflow on AWS and build workflows to run extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Data preparation

To illustrate the orchestration of DataBrew jobs using Amazon MWAA, we use the following publicly available datasets:

You should download these datasets to use in the following steps.

We create input and output S3 buckets with subfolders to capture the yellow taxi data, green taxi data, and lookup data. Then we upload the input data into the input S3 bucket in their respective folders. The following table outlines the overall structure in Amazon S3.

S3 Input Bucket Structure S3 Output Bucket Structure Airflow Bucket Structure
input-bucket-name
  	|- yellow
  	|- green
  	|- taxi-lookup
output-bucket-name
  	|- yellow
  	|- green
  	|- aggregrated_summary
airflow-bucket-name
  	|- dags

Solution overview

To implement our solution, we first create a DataBrew project and DataBrew jobs for data preparation and cleansing. Then we integrate the DataBrew jobs with Amazon MWAA and schedule the workflow. We can validate the table data in Athena. The following diagram illustrates the architecture of this solution.

Create a DataBrew dataset for taxi lookup

After you upload the public datasets to the input S3 bucket (input-bucket-name in respective folders), you can create a DataBrew dataset for the taxi lookup data.

  1. On the DataBrew console, choose Datasets.
  2. Choose Connect new dataset.
  3. For Dataset Name, enter a name (for this post, ny-taxi-lookup).
  4. Connect to the taxi_lookup.csv public dataset.
  5. Choose Create dataset.

Create a DataBrew project for green taxi data

To illustrate a DataBrew project with simple transformation logic, we create a recipe with the following steps:

  • Use a zone lookup to pull location names based on PULocationID
  • Rename the column Zone to pickup_zone.
  • Use a zone lookup to pull location names based on DOLocationID
  • Rename the column lpep_pickup_datetime to pickup_datetime
  • Rename the column lpep_dropoff_datetime to dropoff_datetime
  • Add the new column partition_column using the dateTime function

To create a DataBrew project for green taxi data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create project.
  3. For Project name, enter a name (for this post, green-taxi).
  4. Under Select a dataset, select New dataset.
  5. For Enter your source from S3, enter the S3 input bucket path of the green taxi input CSV file.
  6. Under Permissions, for Role name, choose an IAM role that allows DataBrew to read and write from your input S3 bucket.

You can choose a role if you already created one, or create a new one.

  1. Choose Create Project.

The dataset can take approximately 2 minutes to load.

  1. After the dataset is loaded, choose Join.
  2. For Select Dataset, choose ny-taxi-lookup and choose Next.
  3. Choose Left join.
  4. For Table A, choose PULocationID.
  5. For Table B, choose LocationID.
  6. Under Column list, choose the following columns :
    vendorid, lpep_pickup_datetime, lpep_dropoff_datetime, passenger_count, trip_distance, total_amount, PULocationID, DOLocationID, and Zone.
  7. Choose Finish.

This process adds the first step in the recipe (use zone lookup to pull the location name based on PULocationID).

  1. Choose Add step to add another transformation.
  2. Under Column Actions, choose Rename.
  3. For Source column, choose Zone.
  4. For New column name, enter pickup_zone.
  5. Choose Apply.

This process adds the second step in the recipe (rename the column Zone to pickup_zone).

  1. Add the remaining steps to the recipe:
    1. Use zone lookup to pull the location name based on DOLocationID.
    2. Under Column list, select limited list of columns from Table B. Here is list of the columns vendorid, lpep_pickup_datetime, lpep_dropoff_datetime, passenger_count, trip_distance, total_amount, PULocationID, DOLocationID, and Zone.
    3. Rename column Zone to dropoff_zone.
    4. Rename column lpep_pickup_datetime to pickup_datetime.
    5. Rename column lpep_dropoff_datetime to dropoff_datetime.

We now add a new column named partition_column.

  1. On the Functions menu, under Date functions, choose DATETIME.
  2. Create a new column called partition_column to use for partitioning in future steps.
  3. Provide a description of the recipe version and choose Publish.

Create a DataBrew job for green taxi data

Now that our recipe is ready, we create a DataBrew job for our dataset.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create job.
  3. For Job name¸ enter a name (for example, green-taxi-job).
  4. For Job type, select Create a recipe job.
  5. For Run on, select Project.
  6. For Select a project, search for and choose your project (green-taxi).
  7. Under Job output settings¸ for File type, choose your final storage format PARQUET (other options are available).
  8. For S3 location, enter your final S3 output bucket path.
  9. For Compression, choose Snappy (other options are available).
  10. Under Additional configurations, for Custom partition by column values, choose partition_column and choose Add.
  11. For File output storage, select Replace output files for each job run (for our use case, we want to do a full refresh).
  12. Under Permissions, for Role name¸ choose your IAM role.
  13. Choose Create job.

Now the job is ready. You can choose the job on the Jobs page, choose View Details, and view the details on the Data Lineage tab.

Repeat the steps from the green taxi sections to create a DataBrew project, job, and the corresponding recipe steps for the yellow taxi data:

  • Use zone lookup to pull location names based on PULocationID and additional columns vendorid, lpep_pickup_datetime, lpep_dropoff_datetime, passenger_count, trip_distance, total_amount, PULocationID, DOLocationID, and Zone.
  • Rename the column Zone to pickup_zone
  • Use zone lookup to pull location names based on DOLocationID and additional columns vendorid, lpep_pickup_datetime, lpep_dropoff_datetime, passenger_count, trip_distance, total_amount, PULocationID, DOLocationID, and Zone.
  • Rename the column Zone to dropoff_zone.
  • Rename the column tpep_pickup_datetime to pickup_datetime.
  • Rename column tpep_dropoff_datetime to dropoff_datetime.
  • Add the new column partition_column using the dateTime function.

The following diagram shows the data lineage for the yellow taxi data.

Create Athena tables

Create the following external tables on the Athena console:

  • yellow_taxi – Contains the latest snapshot of every day’s yellow taxi data
  • green_taxi – Contains the latest snapshot of every day’s green taxi data
  • aggregate_summary – Contains the historical snapshot of every day’s aggregated data

Run the following DDL statements to create yellow_taxi, green_taxi, and aggregated_summary tables:

CREATE EXTERNAL TABLE IF NOT EXISTS default.yellow_taxi
(
  vendorid int, 
  pickup_datetime timestamp, 
  dropoff_datetime timestamp, 
  passenger_count int, 
  trip_distance double, 
  total_amount double,
  pickup_zone string,
  dropoff_zone string
)
PARTITIONED BY (partition_column string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://output-bucket-name/yellow/';

CREATE EXTERNAL TABLE IF NOT EXISTS default.green_taxi
(
  vendorid int, 
  pickup_datetime timestamp, 
  dropoff_datetime timestamp, 
  passenger_count int, 
  trip_distance double, 
  total_amount double,
  pickup_zone string,
  dropoff_zone string
)
PARTITIONED BY (partition_column string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://output-bucket-name/green/';

CREATE EXTERNAL TABLE default.aggregate_summary
(
  taxi_type varchar(6), 
  vendorid int, 
  pickup_datetime timestamp, 
  dropoff_datetime timestamp, 
  passenger_count int, 
  trip_distance double, 
  total_amount double, 
  pickup_zone string, 
  dropoff_zone string 
)
PARTITIONED BY (partition_column string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://output-bucket-name/aggregated_summary/'
TBLPROPERTIES ('has_encrypted_data'='false')

Create an Airflow DAG

Your next step is to create an Airflow Directed Acyclic Graph (DAG).

  1. On the Amazon S3 console, create a local file called requirements.txt with the following content:
    boto3 >= 1.17.9
  2. Upload requirements.txt to the S3 bucket airflow-bucket-name.
  3. Create an Amazon MWAA cluster. For instructions, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).
  4. Create the local file ny_taxi_brew_trigger.py with the following code and upload it to the S3 bucket airflow-bucket-name/dags (provide the location for the Athena query results and the name of the output bucket, and update the cron entries if you want to change the job run frequency or time):
    import datetime
    import os
    import boto3
    from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
    from airflow.contrib.operators.s3_delete_objects_operator import S3DeleteObjectsOperator
    from airflow.hooks import S3_hook
    import sys
    import time
    from airflow import DAG
    
    DEFAULT_ARGS = {
        "owner": "admin",
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": True,
        "email_on_retry": False,
    }
    
    # S3 Prefix check. Function checks if s3 prefix exists and pushes variable True/False in xcom.
    def check_prefix(**kwargs):
      s3_hook = S3_hook.S3Hook()
      prefix_status=s3_hook.check_for_prefix(bucket_name=bucket_name,prefix=s3_prefix+s3_partition+"="+today_date,delimiter='/')
      kwargs['ti'].xcom_push(key='value from xcom', value=prefix_status)
      print(prefix_status)
      return prefix_status
    
    # s3 boto3 delete prefix . To be executed only if s3 prefix is found and is True.
    def delete_prefix_boto(**kwargs):
     s3_cli = boto3.resource('s3')
     bucket = s3_cli.Bucket(bucket_name) 
     for obj in bucket.objects.filter(Prefix=s3_prefix+s3_partition+"="+today_date):
      s3_cli.Object(bucket.name,obj.key).delete()
    
    #Branch and call correct operator as per xcom variable
    def branch(**kwargs):
        return 'prefix_' + str(kwargs['ti'].xcom_pull(task_ids='prefix_exists', key='value from xcom'))
    
    # Custom operator to trigger Databrew job. This function utilizes boto3 client
    def run_customer_job(**kwargs):
        job_name=kwargs['job_name']
        print("Starting Brew Job")
    #Trigger the Databrew job and monitor it’s state.
        run_id=glue_databrew_client.start_job_run(Name=job_name)
        state = glue_databrew_client.describe_job_run(Name=job_name,RunId=run_id['RunId'])
    #Keep polling every 30 seconds to see if there is a status change in Job run.
        if state:
            status=state['State']
            while status not in ['SUCCEEDED']:
                print("Sleeping")
                time.sleep(30)
                job_status = glue_databrew_client.describe_job_run(Name=job_name,RunId=run_id['RunId'])
                status = job_status['State']
                print("Checking the status. Current status is",status)
                if status in ['STOPPED', 'FAILED', 'TIMEOUT']:
                    sys.exit(1)
    
    # Initialization of variable
    region_name=boto3.session.Session().region_name
    glue_databrew_client=boto3.client('databrew',region_name=region_name)
    today_date=str(datetime.date.today())
    #Update the athena path here
    athena_output = 's3://athena-query-result-location/' ### Update with your athena query result location
    #Update the prefix of aggregated summary here
    s3_prefix='aggregated_summary/'
    s3_partition='partition_column'
    
    #Update the bucket name where aggregated summary will be stored
    bucket_name= 'output-bucket-name’ ### Update with your s3 output-bucket-name
    
    yellow_update_partition="""MSCK REPAIR TABLE default.yellow_taxi;"""
    green_update_partition="""MSCK REPAIR TABLE default.green_taxi;"""
    stage_table = """create table aggregate_staging WITH (partitioned_by = ARRAY['partition_column']) as (
    select 'yellow' as taxi_type,* from default.yellow_taxi
    union
    select 'green' as taxi_type,* from default.green_taxi)
    """
    insert_aggregate_table="""
    insert into aggregate_summary(
    select * from aggregate_staging
    );
    """
    drop_stage_table = """drop table aggregate_staging"""
    
    dag = DAG(
        dag_id="nytaxi-brew-job",
        default_args=DEFAULT_ARGS,
        default_view="graph",
        schedule_interval="19 02 * * *", #### Change cron entry to schedule the job
        start_date=datetime.datetime(2021, 3, 8), ### Modify start date accordingly
    	 catchup=False, ### set it to True if backfill is required.
        tags=["example"],
    )
    
    fork = BranchPythonOperator(
        task_id='Prefix_True_or_False',
        python_callable=branch,
        provide_context=True,
        dag=dag)
    
    
    yellow_taxi = PythonOperator(task_id='yellow_taxi',python_callable=run_customer_job,op_kwargs={'job_name':'yellow-taxi-job'},dag=dag)
    green_taxi = PythonOperator(task_id='green_taxi',python_callable=run_customer_job,op_kwargs={'job_name':'green-taxi-job'},dag=dag)
    s3_prefix_exists = PythonOperator(task_id='prefix_exists',python_callable=check_prefix,op_kwargs={'s3_prefix':s3_prefix,'s3_partition':s3_partition,'kwargs_date':today_date},provide_context=True,dag=dag)
    
    true_task = DummyOperator(task_id='prefix_True', dag=dag)
    false_task = DummyOperator(task_id='prefix_False', dag=dag)
    
    delete_task = PythonOperator(task_id='prefix_delete',python_callable=delete_prefix_boto,op_kwargs={'s3_prefix':s3_prefix,'s3_partition':s3_partition,'kwargs_date':today_date},dag=dag)
    
    update_yellow_partition = AWSAthenaOperator(task_id="update_yellow_partition",query=yellow_update_partition, database='default', output_location=athena_output)
    
    update_green_partition = AWSAthenaOperator(task_id="update_green_partition",query=green_update_partition, database='default', output_location=athena_output)
    
    stage_table = AWSAthenaOperator(task_id="create_stage_table",query=stage_table, database='default', output_location=athena_output)
    
    insert_aggregate_table = AWSAthenaOperator(task_id="insert_aggregate_table",trigger_rule='none_failed_or_skipped',query=insert_aggregate_table, database='default', output_location=athena_output)
    
    drop_stage_table = AWSAthenaOperator(task_id="drop_stage_table",query=drop_stage_table, database='default', output_location=athena_output)
    
    yellow_taxi >> update_yellow_partition >> stage_table >> s3_prefix_exists >> fork
    green_taxi >> update_green_partition >> stage_table >> s3_prefix_exists >> fork
    fork >> true_task >> delete_task >> insert_aggregate_table >> drop_stage_table
    fork >> false_task >> insert_aggregate_table >> drop_stage_table
  1. Wait until the cron schedule entries from the script start the job.

You can use the Airflow DAGs to check the job run status.

Choose the job to see the visual representation of the DAG.

Data validation: Query the Athena tables

You can now test your solution by querying your tables in Athena.

  1. Run the following query on the yellow_taxi table to retrieve 10 sample rows:
    Select * from default.yellow_taxi limit 10;

    The following screenshot shows the query results.

  2. Run the following query on the green_taxi table to retrieve 10 sample rows:
    Select * from default.green_taxi limit 10;

    The following screenshot shows the query results.

  3. Run the following query on the aggregate_summary table to retrieve 10 sample rows:
    Select * from default.aggregate_summary limit 10;

    The following screenshot shows the query results.

Clean up

Complete the following steps to avoid incurring future charges:

  1. On the Amazon MWAA console, on the Environments page, select your cluster.
  2. Choose Delete.
  3. Choose Delete again to confirm the deletion.
  4. On the Amazon S3 console, delete the buckets input-bucket-name, output-bucket-name, and airflow-bucket-name.
  5. On the Athena console, run the following commands to delete the tables you created:
    drop table default.yellow_taxi;
    drop table default.green_taxi;
    drop table default.aggregate_summary;
  6. On the DataBrew console, on the Jobs page, select each job and on the Actions menu, choose Delete.
  1. On the Projects page, select each project and on the Actions menu, choose Delete.
  2. Select Delete attached recipe.
  3. Choose Delete again to confirm each deletion.

Conclusion

This post walked you through the steps to orchestrate DataBrew jobs with Amazon MWAA and schedule the workflow. To illustrate a simple transformation logic pipeline, we used DataBrew jobs to join two datasets, rename a column, and add a new column. We also used Athena to verify the results.

You can use this solution for your own use cases and orchestrate a data transformation pipeline with DataBrew and Amazon MWAA. If you have comments or feedback, please leave them in the comments section.


About the Authors

Sundeep Kumar is a Data Architect, Data Lake at AWS, helping customers build data lake and analytics platform and solutions. When not building and designing data lakes, Sundeep enjoys listening music and playing guitar.

Rahul Sonawane is a Principal Specialty Solutions Architect-Analytics at Amazon Web Services.