Amazon announced the availability of Amazon Managed Workflows for Apache Airflow (MWAA), a fully managed service that makes it easy to run Apache Airflow on AWS and to build data processing workflows in the cloud. Apache Airflow is an open-source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as “workflows”. Amazon Personalize enables developers to build applications with the same machine learning (ML) technology used by Amazon.com for real-time personalized recommendations — no ML expertise required. This article shows how we can build and manage an ML workflow using Google BigQuery, Amazon MWAA, and Amazon Personalize. We’ll build a session-based recommender system to predict the most popular items for an e-commerce website based on the traffic data of the product pages tracked by Google Analytics. BigQuery is an enterprise data warehouse that solves this problem by enabling super-fast SQL queries using the processing power of Google’s infrastructure. High-Level Solution We’ll start by extracting the data, transforming the data, and building, training, deploying a solution version (a trained Amazon Personalize recommendation model) and deploying a campaign. These tasks will be plugged into a workflow that can be orchestrated and automated through Apache Airflow integration with Amazon Personalize and Google BigQuery. The diagram below represents the workflow we’ll implement for building the recommender system: The workflow consists of the following tasks: Data Preparation Export session and hit data from a Google Analytics 360 account to BigQuery, use SQL to query Analytics data into Pandas data frame with Personalize format, and then write data frame to CSV file directly to S3. Amazon Personalize Solution Create a Personalize dataset group if it doesn’t exist. Create an Interaction schema for our data if the schema doesn’t exist. Create an ‘Interactions’ dataset type if it doesn’t exist. Attach an Amazon S3 policy to your Amazon Personalize role if it doesn’t exist. Create a Personalize role that has the right permissions if it doesn’t exist. Create your Dataset import jobs. Create / Update Solution. Create/ Update Campaign. Before implementing the solution, you have to create an Airflow Environment Using Amazon MWAA ; “extra packages” should be included while creating an environment, please don’t include BigQuery Client in the below , we will install BigQuery Client in the next step: requirements.txt boto >= httplib2 awswrangler google-api-python-client 2.49 .0 When the new Airflow environment is ready to be used, attach the Personalize policies to the IAM role of your environment, run CLI as below: iam put-role-policy - role-name AmazonMWAA-MyAirflowEnvironment-em53Wv - policy-name AirflowPersonalizePolicy - policy-document file://airflowPersonalizePolicy.json $aws { : , : , : [ { : , : , : [ ], : [ ] }, { : , : , : [ , ], : [ , ] }, { : , : , : [ , ], : [ ] }, { : , : [ , ], : } ] } "Version" "2012-10-17" "Id" "AirflowPersonalizePolicy" "Statement" "Sid" "PersonalizeAccessPolicy" "Effect" "Allow" "Action" "personalize:*" "Resource" "*" "Sid" "S3BucketAccessPolicy" "Effect" "Allow" "Action" "s3:PutObject" "s3:PutBucketPolicy" "Resource" "arn:aws:s3:::airflow-demo-personalise" "arn:aws:s3:::airflow-demo-personalise/*" "Sid" "IAMPolicy" "Effect" "Allow" "Action" "iam:CreateRole" "iam:AttachRolePolicy" "Resource" "*" "Effect" "Allow" "Action" "iam:GetRole" "iam:PassRole" "Resource" "arn:aws:iam::*:role/PersonalizeS3Role-*" Installing Google Bigquery Client . MWAA currently doesn't support Google Cloud Bigquery client ( google-cloud-bigquery ) and pandas-gbq with grpc > 1.20 We are not able to install Bigquery Client through , if you put the above dependencies into requiremnts.txt, pip installation won’t install any dependencies in , and you will meet error when running DAG. requirements.txt requirements.txt No module named “httplib2” To resolve this issue, we can: package the required Google libraries in local computer and upload to S3, and then download them to Airflow workers when the Bigquery export task started, after that we can dynamically import required modules given the full file path. I created a bash file and requirements.txt for the above steps; run the following command: setup.sh $bash setup.sh virtualenv -p python3.7 venv venv/bin/activate pip install -r requirements.txt venv/lib/python3.7 zip -r site-packages.zip site-packages/ mv site-packages.zip ../../../site-packages.zip ../../../ deactivate aws s3 cp site-packages.zip s3://airflow-demo-personalise/ #!/bin/bash source cd cd requirements.txt == . == . == . == . == . == . == . == . == . == . == . == . == . == . == == . == . == . == . == . == . == . == . == . == . == . == == . == . == == . == . == . == == . == . == . == . cachetools 4.2 0 certifi 2020.12 5 cffi 1.14 4 chardet 3.0 4 google-api-core 1.24 0 google-auth 1.24 0 google-auth-oauthlib 0.4 2 google-cloud-bigquery 2.6 1 google-cloud-bigquery-storage 2.1 0 google-cloud-core 1.5 0 google-crc32c 1.1 0 google-resumable-media 1.2 0 googleapis-common-protos 1.52 0 grpcio 1.34 0 idna 2.10 libcst 0.3 15 mypy-extensions 0.4 3 numpy 1.19 4 oauthlib 3.1 0 pandas 1.1 5 pandas-gbq 0.14 1 proto-plus 1.13 0 protobuf 3.14 0 pyarrow 2.0 0 pyasn1 0.4 8 pyasn1-modules 0.2 8 pycparser 2.20 pydata-google-auth 1.1 0 python-dateutil 2.8 1 pytz 2020.4 PyYAML 5.3 1 requests 2.25 0 requests-oauthlib 1.3 0 rsa 4.6 six 1.15 0 typing-extensions 3.7 4.3 typing-inspect 0.6 0 urllib3 1.26 2 Then copy the following code to the DAG task to import google modules dynamically: s3 = boto3.resource( ) logger.info( ) s3.Bucket(BUCKET_NAME).download_file(LIB_KEY, ) zipfile.ZipFile( , ) zip_ref: zip_ref.extractall( ) sys.path.insert( , ) pyarrow google.cloud bigquery airflow.contrib.hooks.bigquery_hook BigQueryHook 's3' "Download google bigquery and google client dependencies from S3" '/tmp/site-packages.zip' with '/tmp/site-packages.zip' 'r' as '/tmp/python3.7/site-packages' 1 "/tmp/python3.7/site-packages/site-packages/" # Import google bigquery and google client dependencies import from import from import Next, We will create Airflow UI. Google Cloud Connection in Airflow Now, we will be able to use Google BigQuery in Amazon Managed Airflow workers; let’s begin to create workflow tasks. Data Preparation First, export session and hit data from a Google Analytics 360 account to BigQuery, use SQL to query Analytics data into Pandas data frame with Personalize format. To prepare an interaction dataset for Personalize, we need to extract the following data from BigQuery Google Analytics: , In this example, we don’t have user data of the e-commerce website, and there is no user interaction data from the website database. However, we can use the client id provided by Google Analytics. The client id ( ) is a unique identifier for a browser–device pair that helps Google Analytics link user actions on a site. By default, Google Analytics determines unique users using this parameter. The client ID format is a randomly generated 31-bit integer followed by a dot ( ) followed by the current time in seconds. Hence we only need the 31-bit integer before the dot. BigQuery provides regular expression support, which we can put in Big Query to extract the session id as session-based User Id. USER_ID cid “.” REGEXP_EXTRACT(USER_ID, r’(\d+)\.’) AS USER_ID , Google Analytics provides page location ( ), so we can extract product pages (product slug) by WHERE Clause and make it as Item Id. ITEM_ID page_location page_location LIKE ‘%/product/%’ , timestamp data must be in time format, use to convert Analytics event_timestamp to correct format. TIMESTAMP UNIX epoch TIMESTAMP_TRUNC(TIMESTAMP_MICROS(event_timestamp) AS . device.category DEVICE AS . geo.country LOCATION AS . event_name EVENT_NAME SQL query in BigQuery as below: BQ_SQL = """ REGEXP_EXTRACT(USER_ID, r ) USER_ID, UNIX_SECONDS(EVENT_DATE) , REGEXP_EXTRACT(page_location,r ) ITEM_ID, LOCATION, DEVICE, EVENT_NAME EVENT_TYPE ( user_pseudo_id USER_ID, ( value.string_value (event_params) = ) page_location, TIMESTAMP_TRUNC(TIMESTAMP_MICROS(event_timestamp), ) EVENT_DATE, device.category DEVICE, geo.country LOCATION, event_name EVENT_NAME _TABLE_SUFFIX = FORMAT_DATE( , ( (), )) ) page_location USER_ID, EVENT_DATE, page_location, LOCATION, DEVICE,EVENT_NAME EVENT_DATE SELECT '(\d+)\.' AS AS TIMESTAMP 'product/([^?&#]*)' as AS FROM SELECT AS SELECT FROM UNNEST WHERE key "page_location" as MINUTE AS AS AS AS FROM `lively-metrics-295911.analytics_254171871.events_intraday_*` WHERE '%Y%m%d' DATE_SUB CURRENT_DATE INTERVAL 1 DAY WHERE LIKE '%/product/%' GROUP BY ORDER BY DESC """ Next, write the data frame to the CSV file directory to S3 using AWS DataWragler. The following PythonOperator snippet in the DAG defines task. BigQuery to S3 def bq_to_s3(): s3 = boto3.resource( ) logger.info( ) s3.Bucket(BUCKET_NAME).download_file(LIB_KEY, ) zipfile.ZipFile( , ) zip_ref: zip_ref.extractall( ) sys.path.insert( , ) # Import google bigquery and google client dependencies pyarrow google.cloud bigquery airflow.contrib.hooks.bigquery_hook BigQueryHook bq_hook = BigQueryHook( bigquery_conn_id= , use_legacy_sql=False) bq_client = bigquery.Client(project=bq_hook._get_field( ), credentials=bq_hook._get_credentials()) events_df = bq_client.query(BQ_SQL).result().to_dataframe( create_bqstorage_client=False) logger.info( f ) wr.s3.to_csv(events_df, OUTPUT_PATH, index=False) t_export_bq_to_s3 = PythonOperator(task_id= , python_callable=bq_to_s3, dag=dag, retries= ) 's3' "Download google bigquery and google client dependencies from S3" '/tmp/site-packages.zip' with '/tmp/site-packages.zip' 'r' as '/tmp/python3.7/site-packages' 1 "/tmp/python3.7/site-packages/site-packages/" import from import from import "bigquery_default" "project" 'google analytics events dataframe head - {events_df.head()}' 'export_bq_to_s3' 1 Creating a Recommendation Model With Amazon Personalize In this section, we will build a Personalize solution to identify the most popular items for an e-commerce website integrated with Google Analytics. We will use Popularity-Count Recipe for training our model. Although Personalize supports importing interactions incrementally, we will retrain the model base on daily interaction data to get more relevant recommendations. What we’ll cover: ( ): check if the dataset CSV file exists. check_s3_for_key S3KeySensor ( ): check if the Personalize dataset group exists. If Yes, trigger , else trigger . t_check_dataset_group BranchPythonOperator t_init_personalize t_skip_init_personalize ( ): trigger parallels tasks if dataset group doesn't exist( , , , ). t_init_personalize DummyOperator t_create_dataset_group t_create_schema t_put_bucket_policies t_create_iam_role ( ): Create a Personalize dataset group if it doesn’t exist. t_create_dataset_group PythonOperator ( ): Create an Interaction schema for our data if the schema doesn’t exist. t_create_schema PythonOperator ( ): Attach an Amazon S3 policy to your Amazon Personalize role if it doesn’t exist. t_put_bucket_policies PythonOperator ( ): Create a Personalize role that has the right permissions if it doesn’t exist. t_create_iam_role PythonOperator ( ): Create an ‘Interactions’ dataset type if it doesn’t exist. t_create_dataset_type PythonOperator ( ): Downstream task of BranchOperator task. t_skip_init_personalize DummyOperator ( ): Create your Dataset import jobs. t_create_import_dataset_job PythonOperator ( ): Create / Update Solution. t_update_solution PythonOperator ( ): Create/ Update Campaign. t_update_campagin PythonOperator create_dg_response = personalize.create_dataset_group( name=DATASET_GROUP_NAME ) dataset_group_arn = create_dg_response[ ] status = max_time = time.time() + * * time.time() < max_time: describe_dataset_group_response = personalize.describe_dataset_group( datasetGroupArn=dataset_group_arn ) status = describe_dataset_group_response[ ][ ] logger.info( ) status == status == : time.sleep( ) status == : kwargs[ ].xcom_push(key= , value=dataset_group_arn) status == : AirflowFailException( ) dg_response = personalize.list_dataset_groups( maxResults= ) demo_dg = next((datasetGroup datasetGroup dg_response[ ] datasetGroup[ ] == DATASET_GROUP_NAME), ) demo_dg: : kwargs[ ].xcom_push(key= , value=demo_dg[ ]) schema_response = personalize.list_schemas( maxResults= ) interaction_schema = next((schema schema schema_response[ ] schema[ ] == INTERCATION_SCHEMA_NAME), ) interaction_schema: create_schema_response = personalize.create_schema( name=INTERCATION_SCHEMA_NAME, schema=json.dumps({ : , : , : , : [ { : , : }, { : , : }, { : , : }, { : , : , : }, { : , : , : }, { : , : } ] })) logger.info(json.dumps(create_schema_response, indent= )) schema_arn = create_schema_response[ ] schema_arn interaction_schema[ ] s3 = boto3.client( ) policy = { : , : , : [ { : , : , : { : }, : [ , ], : [ .format(BUCKET_NAME), .format(BUCKET_NAME) ] } ] } s3.put_bucket_policy(Bucket=BUCKET_NAME, Policy=json.dumps(policy)) role_name = assume_role_policy_document = { : , : [ { : , : { : }, : } ] } : create_role_response = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_document) ) iam.attach_role_policy( RoleName=role_name, PolicyArn= ) role_arn = create_role_response[ ][ ] time.sleep( ) role_arn ClientError e: e.response[ ][ ] == : role_arn = iam.get_role(RoleName=role_name)[ ][ ] time.sleep( ) role_arn : AirflowFailException( ) ti = kwargs[ ] schema_arn = ti.xcom_pull(key= , task_ids= ) print(schema_arn) dataset_group_arn = ti.xcom_pull(key= , task_ids= ) dataset_type = create_dataset_response = personalize.create_dataset( datasetType=dataset_type, datasetGroupArn=dataset_group_arn, schemaArn=schema_arn, name= ) interactions_dataset_arn = create_dataset_response[ ] logger.info(json.dumps(create_dataset_response, indent= )) interactions_dataset_arn ti = kwargs[ ] interactions_dataset_arn = ti.xcom_pull(key= , task_ids= ) role_arn = ti.xcom_pull(key= , task_ids= ) create_dataset_import_job_response = personalize.create_dataset_import_job( jobName= +suffix, datasetArn=interactions_dataset_arn, dataSource={ : OUTPUT_PATH }, roleArn=role_arn ) dataset_import_job_arn = create_dataset_import_job_response[ ] logger.info(json.dumps(create_dataset_import_job_response, indent= )) status = max_time = time.time() + * * time.time() < max_time: describe_dataset_import_job_response = personalize.describe_dataset_import_job( datasetImportJobArn=dataset_import_job_arn ) dataset_import_job = describe_dataset_import_job_response[ ] dataset_import_job: status = dataset_import_job[ ] logger.info( .format(status)) : status = dataset_import_job[ ][ ] logger.info( .format(status)) status == status == : time.sleep( ) status == : dataset_import_job_arn status == : AirflowFailException( ) recipe_arn = ti = kwargs[ ] dataset_group_arn = ti.xcom_pull(key= , task_ids= ) list_solutions_response = personalize.list_solutions( datasetGroupArn=dataset_group_arn, maxResults= ) demo_solution = next((solution solution list_solutions_response[ ] solution[ ] == SOLUTION_NAME), ) demo_solution: create_solution_response = personalize.create_solution( name=SOLUTION_NAME, datasetGroupArn=dataset_group_arn, recipeArn=recipe_arn ) solution_arn = create_solution_response[ ] logger.info(json.dumps(create_solution_response, indent= )) : solution_arn = demo_solution[ ] kwargs[ ].xcom_push(key= , value=solution_arn) create_solution_version_response = personalize.create_solution_version( solutionArn=solution_arn, trainingMode= ) solution_version_arn = create_solution_version_response[ ] status = max_time = time.time() + * * time.time() < max_time: describe_solution_version_response = personalize.describe_solution_version( solutionVersionArn=solution_version_arn ) status = describe_solution_version_response[ ][ ] logger.info( ) status == status == : time.sleep( ) status == : solution_version_arn status == : AirflowFailException( ) ti = kwargs[ ] solution_version_arn = ti.xcom_pull(key= , task_ids= ) solution_arn = ti.xcom_pull(key= , task_ids= ) list_campagins_response = personalize.list_campaigns( solutionArn=solution_arn, maxResults= ) demo_campaign = next((campaign campaign list_campagins_response[ ] campaign[ ] == CAMPAIGN_NAME), ) demo_campaign: create_campaign_response = personalize.create_campaign( name=CAMPAIGN_NAME, solutionVersionArn=solution_version_arn, minProvisionedTPS= , ) campaign_arn = create_campaign_response[ ] logger.info(json.dumps(create_campaign_response, indent= )) : campaign_arn = demo_campaign[ ] personalize.update_campaign( campaignArn=campaign_arn, solutionVersionArn=solution_version_arn, minProvisionedTPS= ) status = max_time = time.time() + * * time.time() < max_time: describe_campaign_response = personalize.describe_campaign( campaignArn=campaign_arn ) status = describe_campaign_response[ ][ ] print( .format(status)) status == status == : time.sleep( ) status == : campaign_arn status == : AirflowFailException( ) default_args = { : , : , : days_ago( ), : [ ], : , : , : , : timedelta(minutes= ), } dag = DAG( , default_args=default_args, description= , schedule_interval= , ) t_check_dataset_group = BranchPythonOperator( task_id= , provide_context= , python_callable=check_dataset_group, retries= , dag=dag, ) t_init_personalize = DummyOperator( task_id= , trigger_rule=TriggerRule.ALL_SUCCESS, dag=dag, ) t_create_dataset_group = PythonOperator( task_id= , provide_context= , python_callable=create_dataset_group, retries= , dag=dag, ) t_create_schema = PythonOperator( task_id= , python_callable=create_schema, retries= , dag=dag, ) t_put_bucket_policies = PythonOperator( task_id= , python_callable=put_bucket_policies, retries= , dag=dag, ) t_create_iam_role = PythonOperator( task_id= , provide_context= , python_callable=create_iam_role, retries= , dag=dag, ) t_create_dataset_type = PythonOperator( task_id= , provide_context= , python_callable=create_dataset_type, trigger_rule=TriggerRule.ALL_SUCCESS, retries= , dag=dag, ) t_create_import_dataset_job = PythonOperator( task_id= , provide_context= , python_callable=import_dataset, retries= , dag=dag, ) t_skip_init_personalize = DummyOperator( task_id= , trigger_rule=TriggerRule.NONE_FAILED, dag=dag, ) t_init_personalize_done = DummyOperator( task_id= , trigger_rule=TriggerRule.NONE_FAILED, dag=dag, ) t_update_solution = PythonOperator( task_id= , provide_context= , python_callable=update_solution, trigger_rule=TriggerRule.ALL_SUCCESS, retries= , dag=dag, ) t_update_campagin = PythonOperator( task_id= , provide_context= , python_callable=update_campagin, trigger_rule=TriggerRule.ALL_SUCCESS, retries= , dag=dag, ) : def create_dataset_group (**kwargs) "datasetGroupArn" None 2 60 60 # 2 hours while "datasetGroup" "status" f"DatasetGroup: " {status} if "ACTIVE" or "CREATE FAILED" break 20 if "ACTIVE" 'ti' "dataset_group_arn" if "CREATE FAILED" raise f"DatasetGroup create failed" {DATASET_GROUP_NAME} : def check_dataset_group (**kwargs) 100 for in "datasetGroups" if "name" False if not return "init_personalize" else 'ti' "dataset_group_arn" "datasetGroupArn" return "skip_init_personalize" : def create_schema () 100 for in "schemas" if "name" False if not "type" "record" "name" "Interactions" "namespace" "com.amazonaws.personalize.schema" "fields" "name" "USER_ID" "type" "string" "name" "ITEM_ID" "type" "string" "name" "TIMESTAMP" "type" "long" "name" "LOCATION" "type" "string" "categorical" True "name" "DEVICE" "type" "string" "categorical" True "name" "EVENT_TYPE" "type" "string" 2 "schemaArn" return return "schemaArn" : def put_bucket_policies () "s3" "Version" "2012-10-17" "Id" "PersonalizeS3BucketAccessPolicy" "Statement" "Sid" "PersonalizeS3BucketAccessPolicy" "Effect" "Allow" "Principal" "Service" "personalize.amazonaws.com" "Action" "s3:GetObject" "s3:ListBucket" "Resource" "arn:aws:s3:::{}" "arn:aws:s3:::{}/*" : def create_iam_role (**kwargs) f"PersonalizeS3Role- " {suffix} "Version" "2012-10-17" "Statement" "Effect" "Allow" "Principal" "Service" "personalize.amazonaws.com" "Action" "sts:AssumeRole" try "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" "Role" "Arn" # sometimes need to wait a bit for the role to be created 30 return except as if 'Error' 'Code' 'EntityAlreadyExists' 'Role' 'Arn' 30 return else raise f"PersonalizeS3Role create failed" : def create_dataset_type (**kwargs) 'ti' "return_value" 'create_schema' "dataset_group_arn" 'create_dataset_group' "INTERACTIONS" f"DEMO-metadata-dataset-interactions- " {suffix} 'datasetArn' 2 return : def import_dataset (**kwargs) 'ti' "return_value" 'create_dataset_type' "return_value" 'create_iam_role' "DEMO-dataset-import-job-" "dataLocation" 'datasetImportJobArn' 2 None 2 60 60 # 2 hours while "datasetImportJob" if "latestDatasetImportJobRun" not in "status" "DatasetImportJob: {}" else "latestDatasetImportJobRun" "status" "LatestDatasetImportJobRun: {}" if "ACTIVE" or "CREATE FAILED" break 60 if "ACTIVE" return if "CREATE FAILED" raise f"Dataset import job create failed" : def update_solution (**kwargs) "arn:aws:personalize:::recipe/aws-popularity-count" 'ti' "dataset_group_arn" 'create_dataset_group' 100 for in "solutions" if "name" False if not 'solutionArn' 2 else "solutionArn" 'ti' "solution_arn" 'FULL' 'solutionVersionArn' None 2 60 60 # 2 hours while "solutionVersion" "status" f"SolutionVersion: " {status} if "ACTIVE" or "CREATE FAILED" break 60 if "ACTIVE" return if "CREATE FAILED" raise f"Solution version create failed" : def update_campagin (**kwargs) 'ti' "return_value" 'update_solution' "solution_arn" 'update_solution' 100 for in "campaigns" if "name" False if not 2 'campaignArn' 2 else "campaignArn" 2 None 2 60 60 # 2 hours while "campaign" "status" "Campaign: {}" if "ACTIVE" or "CREATE FAILED" break 60 if "ACTIVE" return if "CREATE FAILED" raise f"Campaign create/update failed" 'owner' 'airflow' 'depends_on_past' False 'start_date' 1 'email' 'yi.ai@afox.mobi' 'email_on_failure' False 'email_on_retry' False 'retries' 1 'retry_delay' 5 'ml-pipeline' 'A simple ML data pipeline DAG' '@daily' 'check_dataset_group' True 1 "init_personalize" 'create_dataset_group' True 1 'create_schema' 1 'put_bucket_policies' 1 'create_iam_role' True 1 'create_dataset_type' True 1 'import_dataset' True 1 "skip_init_personalize" "init_personalize_done" 'update_solution' True 1 'update_campagin' True 1 In the next section, we’ll see how all these tasks are stitched together to form a workflow in an Airflow DAG. Defining DAG Different tasks are created in the above sections using operators like for generic Python code to run on-demand or at a scheduled interval. PythonOperator Now let’s set DAG with parameters; a is simply a script that contains a set of tasks and their dependencies. DAG Python default_args = { : , : , : days_ago( ), : [ ], : , : , : , : timedelta(minutes= ), } dag = DAG( , default_args=default_args, description= , schedule_interval= , ) 'owner' 'airflow' 'depends_on_past' False 'start_date' 1 'email' 'yi.ai@afox.mobi' 'email_on_failure' False 'email_on_retry' False 'retries' 1 'retry_delay' 5 'ml-pipeline' 'A simple ML data pipeline DAG' '@daily' Next, specify task dependencies: t_export_bq_to_s3 >> check_s3_for_key >> t_check_dataset_group t_check_dataset_group >> t_init_personalize t_check_dataset_group >> t_skip_init_personalize >> t_init_personalize_done t_init_personalize >> [ t_create_dataset_group, t_create_schema, t_put_bucket_policies, t_create_iam_role ] >> t_create_dataset_type t_create_dataset_type >> t_init_personalize_done t_init_personalize_done >> t_create_import_dataset_job >> t_update_solution t_update_solution >> t_update_campagin After triggering the DAG on-demand or on a schedule, we can monitor DAGs and task executions and directly interact with them through Airflow UI. In the Airflow UI, we can see a graph view of the DAG to have a clear representation of how tasks are executed: Conclusion In this article, I introduced how we can build an ML workflow using MWAA and BigQuery; You can extend the workflow by customizing the DAGs, such as extending the dataset by merging daily data in a certain period (weekly, monthly, etc.), creating parallel tasks using different recipes and retraining models by schedule or trigger with S3 Key Sensor. I hope you have found this article useful. You can find the complete project in my . GitHub repo