May 10, 2024

How to Achieve Source Freshness and Stateful Selection with Cosmos API + dbt Core

By Ashish Paliwal

Cosmos API is a great tool to orchestrate dbt tasks through Airflow. It generates/renders DAG based on the dbt code. Using this tool together with dbt, you can see each dbt model as a single task in Airflow DAG, making your workflow easier to manage.

We have used dbt core + Astronomer Cosmos API integration to perform data transformation on Snowflake. However, we came across a few scenarios that can be achieved easily with dbt core/cloud alone but need additional work while integrating dbt core with Cosmos API. 

In this blog, we will show you how we did this work and cover some tips for deploying and integrating dbt with Cosmos API.

Scenarios

  • Run the dbt source freshness command as an Airflow task.

  • Run full refresh on the selected incremental model.

  • Run the View task only if it is modified.

Tech Stack

  • dbt core.

  • Astronomer Managed Airflow.

  • Cosmos API with default Execution Mode (LOCAL).

  • Github Actions for Airflow deployment.

  • Astronomer Docker image with dbt installed as virtual environment.

Where to Start?

Let’s understand some basics before diving into each scenario.

How does dbt work with Cosmos API? We already have a great blog that gives an overview of Cosmos API + dbt core integration.

Basically, the dbt code is part of the Airflow DAGs project (dags directory). We have used the DBTTaskGroup operator to manage dbt resources into separate logical groups, as shown in the code snippet below.

				
					transform_model = DbtTaskGroup(
       group_id="transform_model",
       project_config=project_config,
       profile_config=profile_config,
       execution_config=execution_config,
       render_config=RenderConfig(
           select=["path:models"],
           load_method=LoadMode.DBT_MANIFEST
       )
   )
   transform_snapshot = DbtTaskGroup(
       group_id="transform_snapshot",
       project_config=project_config,
       profile_config=profile_config,
       execution_config=execution_config,
       render_config=RenderConfig(
           select=["path:snapshots"],
           load_method=LoadMode.DBT_MANIFEST
       )
   )

				
			

During the execution of each task, the dbt command is prepared and executed internally by the Cosmos API. Note that the dbt command is prepared based on the resource type, like model, seed, snapshot, etc. 

As examples:

dbt run command
				
					['/usr/local/airflow/dbt_venv/bin/dbt', 'run', '--models', '<model name>', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
				
			
dbt snapshot command
				
					['/usr/local/airflow/dbt_venv/bin/dbt', 'snapshot', '--models', '<model name>', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
				
			
dbt test command
				
					['/usr/local/airflow/dbt_venv/bin/dbt', 'test', '--models', '<model name>', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
				
			

Scenario #1: Source Freshness Check

In dbt, a source can be configured for freshness check. We want to ensure the DAG won’t run on stale data and that the sources are receiving fresh data at specified intervals. If not, then it fails the task and sends a notification. Learn more details about source freshness

dbt command to check source freshness is:
				
					dbt source freshness
				
			

How to achieve this with Cosmos API?

Unfortunately, there is no direct option to achieve source freshness in Cosmos API. DbtDag and DbtTaskGroup are commonly used to generate DAG from dbt projects. If looked deeper, these operators are wrapped around the operators below (Similar operators are available for virtual environments).

  • DbtLocalBaseOperator – base operator

  • DbtBuildLocalOperatordbt build

  • DbtLSLocalOperatordbt ls

  • DbtSeedLocalOperatordbt seed

  • DbtSnapshotLocalOperatordbt snapshot

  • DbtRunLocalOperatordbt run

  • DbtTestLocalOperatordbt test

  • DbtRunOperationLocalOperatordbt run-operation

As you can see, the above list doesn’t include an operator for executing the dbt source command. 

How to execute the dbt source freshness command?

Two options: BashOperator and DBTLocalBaseOperator

Option #1a: Bash Operator

As mentioned in Astronomer documentation here, BashOperator can be used to execute dbt commands. This does require the profiles.yml (containing database configuration) in the dbt project directory. This can be further improved by passing the Airflow connection as an environment variable in the DAG, as mentioned here.

Option #1b: Dbt Local Base Operator

Instead of dbt’s profiles.yml, we want to utilize the database connection defined in the Airflow. So we went for DBTLocalBaseOperator. Like DBTTaskGroup, DBTLocalBaseOperator uses database credentials from Airflow to build dbt profiles.yml.

				
					src_fresh_check = DbtLocalBaseOperator(
     profile_config=profile_config,
     task_id="src_fresh_check_task",
     project_dir=DBT_PROJECT_PATH,
     dbt_executable_path=DBT_EXECUTABLE_PATH,
     base_cmd = ["source","freshness"],
   )

				
			

This can be accomplished by simply overriding the base command of the Python class. In fact, any dbt command can be added here if there is a need for more customization.

Scenario #2: Full Refresh of Incremental Models

As we understand, a full refresh on incremental models and seeds is needed for a number of different reasons:

  • Schema change on an incremental model.

  • A new column is added and needs to be backfilled.

  • Transformation logic changed on an incremental model.

  • Schema change of seeds.

Note that if there is no requirement to backfill newly added columns, then dbt’s configuration on_schema_change parameter can adapt schema change without refreshing/rebuilding entire models. Learn more about changing columns of an incremental model.

How to apply full refresh on the Incremental model?

Cosmos API has the provision to supply a full_refresh flag to dbt models.

				
					transform_model = DbtTaskGroup(
       group_id="transform_model",
       project_config=project_config,
       profile_config=profile_config,
       execution_config=execution_config,
operator_args={
               "full_refresh": True,
       },
       render_config=RenderConfig(
           select=["path:models"],
           load_method=LoadMode.DBT_MANIFEST
       )
   )

				
			

During the execution of each task, dbt command (for dbt run) looks like.

				
					['/usr/local/airflow/dbt_venv/bin/dbt', 'run', '--models', '<model name>', '--full-refresh', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']

				
			

However, there are few issues with this approach:

  1. It will apply a full refresh flag to all incremental models and seeds, which results in rebuilding all those models during every DAG run irrespective of whether the schema changed, which defeats the purpose of defining models as incremental.

  2. Because Cosmos API does not have a provision to refer to the previous state file of models (dbt metadata file called manifest.json), so there is no way that DAG knows if there is a schema change.

  3. Even if we are able to solve the above issue, it is hard to decide whether to run the task/dbt command with full refresh or not at runtime within a single DAG.  

From the above points, it is clear that a single DAG is not enough to perform a normal run as well as a full refresh on selected incremental models. So, we have introduced another DAG to refresh the incremental models fully.

Here is the functionality of both DAGs:

DAG #1: Transformation DAG
  • Runs every hour and performs transformation on new and/or modified data.

  • Run dbt source freshness check.

  • Run and test all models, snapshots, and seeds.

  • Run incremental model but no full refresh.

DAG #2: Full refresh DAG
  • This DAG is not scheduled and runs manually.

  • Run full refresh only on modified incremental models and their dependents.

  • Run full refresh on seed if the schema changes.

Let’s jump to the next challenge of state file (manifest.json) management.

dbt generates a metadata file called manifest.json. It represents the current state of all models.

Example:

With dbt core, the command below runs all models that have been modified compared to the previous state.

				
					dbt run --select "state:modified" --state <path_to_metadata_directory>

				
			

Learn more about the stateful selection.
Learn more about manifest.json.

As we understand, Airflow’s DAG code needs to be deployed every time there is a change in dbt models. So, we have utilized the CICD pipeline to generate and store manifest.json.

During deployment:

  1. Download the manifest.json of the previous deployment from storage (AWS or Azure) and save it under the Airflows dags directory.

  2. Generates the manifest.json of the current state and uploads to storage (AWS or Azure).

CICD workflow steps
				
					
jobs:
   runs-on: ubuntu-latest
   steps:
   - name: Checkout repository
     uses: actions/checkout@v2
   - name: Install dbt
     run: |
         python -m pip install -r <dbt_dir>/requirements.txt
         dbt deps
   - name: Generate manifest file for current state
     run: |
         cd <dbt_dir>
         dbt ls
   - name: Download dbt manifest of previous state
     run: |
         az storage fs file download --path manifest.json \
                               --destination dags/<new_dir>/
   - name: Upload dbt manifest of current state
     run: |
         az storage fs file upload --path manifest.json \
                           --source <dbt_dir>/target/manifest.json \
                           --overwrite true
   - name: Upload same dbt manifest into archive
     run: |
         az storage fs file upload --path <current_date_time_dir>/manifest.json \
                           --source <dbt_dir>/target/manifest.json
   - name: Deploy to Astro
     uses: astronomer/deploy-action@v0.3
				
			

Steps:

  1. Checkout repository.

  2. Install dbt and its dependencies. 

  3. Run the dbt ls command to generate manifest.json. It will be generated in the target directory.

  4. Download the manifest.json of the previous deployment from Azure storage.

  5. Upload (overwrite) manifest.json of current deployment into Azure storage.

  6. Upload manifest.json of current deployment with date-time directory into Azure storage.

Once deployed, manifest.json is now part of DAG’s code and will be referred to while rendering DAG #2.

DAG #2 code:
				
					full_refresh = DbtTaskGroup(
       group_id="full_refresh_group",
       project_config=project_config,
       profile_config=profile_config,
       execution_config=execution_config,
       operator_args={
           "full_refresh": True
       },
       render_config=RenderConfig(
           load_method=LoadMode.DBT_LS,
           select=["state:modified+"],
           env_vars={
                   "DBT_STATE": DBT_METADATA_PATH # dir path of previous state manifest.json under dags directory
               },
       )
   )

				
			

Let’s review RenderConfig:

  • LoadMode.DBT_LS: env_vars is not supported in other parsing methods, such as LoadMode.MANIFEST or LoadMode.AUTOMATIC (default). Note that LoadMode.DBT_LS is slower as compared to LoadMode.MANIFEST and should not be used if DAG is going to run more often, like every hour.

  • env_vars: To pass environment variables to dbt, Cosmos API does not provide any parameter to define the path of manifest.json. So, we have used the dbt environmental variable called DBT_STATE to define the manifest.json path.

  • select: This is used to select modified nodes/models based on manifest.json. Note that this selection will be used while rendering DAG and not at execution time.

Once deployed, the manifest.json files are compared, and DAG #2 renders only modified models as a task. If the Airflow project (with dbt code) is deployed multiple times with no changes in models, DAG#2 will not have any task.

Scenario #3: View Recreation

DAG#1 is scheduled every hour and runs all models, including views. If there is no change in the view definition, then recreating the view is an unnecessary consumption of computation (though it is very small, it can be significant over a period of time). Because DAG #2 can now deal with modified models (including View’s), we can skip View’s related task from DAG #1 entirely.

 

Use the exclude parameter of RenderConfig to filter views from rendering on DAG #1.

				
					transform_model = DbtTaskGroup(
       group_id="transform_model",
       project_config=project_config,
       profile_config=profile_config,
       execution_config=execution_config,
       render_config=RenderConfig(
           select=["path:models"],
           exclude=["config.materialized:view"],
           load_method=LoadMode.DBT_MANIFEST
       )
   )

				
			

No changes required on DAG #2. Remember, DAG #2 renders a model based on manifest.json. In case there is a change in view definition, it will be rendered into the DAG #2.

Miscellaneous Configuration Definition

				
					profile_config = ProfileConfig(
   profile_name="dbtlearn",
   target_name="dev",
   profile_mapping="<Define mapping based on Database authentication type>"
)

project_config = ProjectConfig(
   dbt_project_path="<dbt project path under dags directory>",
   manifest_path="<path of manifest.json of current state. Usually target dir>",
)

execution_config = ExecutionConfig(
   dbt_executable_path="<dbt executable path>",
)

				
			

Deployment Steps

Note that both DAGs should not run at the same time. Before every deployment, the scheduled DAG should be paused. If there is a need for full refresh runs or View recreation, then first run DAG#2 manually and then unpause DAG#1.

Conclusion

Overall, we have built an out-of-box solution to add more flexibility to DAG tasks/dbt model execution. Now, Incremental full refresh and View Recreation of specific resources can run without impacting regular runs but in a controlled fashion. With the help of dbt metadata files, DAG#2 is smart enough to generate tasks only for modified models. So if there is no change, no tasks will be rendered in DAG#2 and avoid any side effects if executed accidentally.

Because dbt metadata files are now available on the server, a more customized solution is possible though it will be more static in nature(because metadata files are generated during deployment).

Need additional support to optimize dbt core functionality with Cosmos API?

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit