This blog serves as a helpful starting point to explore the best practices for utilizing Astronomer Cosmos to gain better observability and streamline your dbt workflows.
While I had dabbled with dbt in the past, this project marked my first real exploration with my project team, and I opted to leverage Cosmos.
The Astronomer Cosmos package emerged from a collaboration between Airflow and dbt experts to offer an insightful and feature-rich approach to running dbt projects on your Airflow platform.
This blog will demonstrate how we utilized the Cosmos DbtTaskGroup
object to execute dbt snapshots, models, and other dbt commands in Airflow. Additionally, it will dive into some key learnings encountered during the implementation with AWS MWAA. Our focus was on connecting to the Snowflake Data Cloud, and authentication was achieved via encrypted private keys.
Summary
In summary, Cosmos proves to be robust, but it does require some time investment to establish a workable DAG (Directed Acyclic Graph) due to immature documentation. While the astronomer/cosmos-example repository offers more complete examples, most assume the usage of an Airflow Connection, a dependency we will not leverage here.
Quoting Cosmos from the product page:
“Cosmos is the easiest and most powerful way to integrate dbt + Airflow”
Based on my experience with dbt, I would agree, but I would add “…once you navigate through the Cosmos docs a few times.”
Astronomer Cosmos Resources
Before getting started, here are the resources I used for development:
Astronomer Cosmos docs – These are the official docs. They’re ok.
GitHub – astronomer/astronomer-cosmos – The project is young, so the docs being ok is somewhat expected. Referencing source code was necessary.
Astro CLI – This was instrumental for quick testing, and I cannot recommend it more.
GitHub – dbt-labs/jaffle-shop: An open-source sandbox project exploring dbt workflows via a fictional sandwich shop’s data – it is bespoke.
Requirements and Set Up
Astro CLI
The Astro CLI provided my first experience with Airflow, where creating a DAG felt truly agile. It was a game-changer, allowing me to easily mimic installed packages of MWAA instances, change Airflow versions, and get near-instant feedback for DAG import issues.
To install the Astro CLI, follow the official documentation and have the Docker Desktop installed.
After Astro CLI is installed you can walk through this – Get started with Airflow using the Astro CLI | Astronomer Documentation, but here are the specific commands.
First, make sure Docker Desktop is running then…
astro dev init
Modify the requirements.txt
file with the following packages:
dbt-core == 1.5.2
dbt-snowflake == 1.5.1
astronomer-cosmos==1.1.1
Then start Astro:
astro dev start
Snowflake Account
openssl genrsa 2048 | openssl pkcs8 -v2 des3 -topk8 -inform PEM -out rsa_key.p8 -passout pass:mypassphrase
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub -passin pass:mypassphrase
cat rsa_key.pub
For more details on setting up the private key, refer to Key Pair Authentication & Key Pair Rotation | Snowflake Documentation.
dbt Core Project
I’d recommend cloning Jaffle Shop – GitHub – dbt-labs/jaffle-shop: An open source sandbox project exploring dbt workflows via a fictional sandwich shop’s data – but you can certainly use your project.
Make sure to see the Sample Profile and add that to your project.
Key Takeaways
If you prefer to skip the narrative, head straight to the Full Sample DAG.
Here are shortcuts to the key takeaways:
Cosmos DAG Configuration
We leveraged DbtTaskGroup
for everything. It allows you to run dbt models and other dbt commands in Airflow while gaining insights into your dbt project via Airflow graphs and task logs.
Initially, we decided to rely on the Getting Started on MWAA — Astronomer Cosmos documentation, but lack of a concrete example led us to the Full Sample DAG.
With one exception: we initially used ExecutionMode.VIRTUALENV
.
ExecutionMode.VIRTUALENV vs. LOCAL
The DbtTaskGroup
spun up and tore down a temporary Python virtual environment per task (i.e., per dbt model). However, after introducing multiple dbt projects, each with many dags and tasks, we observed MWAA disk space issues.
Whether this was an issue with Cosmos or us running too much in parallel, I cannot say. However, I didn’t encounter any issues locally. Our solution was to use ExecutionMode.LOCAL
.
Key Take Away 1: Execution Modes and Environment Variables
The documentation suggests injecting environment variables using operator_args
. This works when running ExecutionMode.VIRTUALENV
but not in LOCAL
mode. The operator accepts the args in LOCAL
mode, but the DAG will encounter an import error:
Parsing Error
Env var required but not provided: 'PRIVATE_KEY_PASSPHRASE'
After some stumbling, we realized that setting os.environ["PRIVATE_KEY_PASSPHRASE"] = PRIVATE_KEY_PASSPHRASE
was the answer.
VIRTUALENV
might be great if you have varying requirements and need teams to be in control. However, if you’re running on MWAA or other Airflow platforms, you might experience disk space issues with all the temporary virtual environments.
Cosmos DAG Structure
As a general guide, we learned to structure our DAGs as follows:
EmptyOperator for task starts
DbtTaskGroup for snapshots (optional)
DbtTaskGroup for models (add other groups as needed such as for macros)
EmptyOperator for task ends
That is to say, if you need anything coordinated aside from a dbt run
and dbt test
commands, you should create and use more than one DbtTaskGroup
.
Key Takeaway 2: Snapshot and Model Pattern
Astronomer Cosmos uses dbt lineage to determine what models to run and break them into Airflow tasks. As with dbt CLI, snapshots are separate. To control running snapshots first and models after, we just had to use two DbtTaskGroup
’s.
pre = EmptyOperator(task_id="pre_dbt")
snapshots = DbtTaskGroup(
task_id="snapshots",
project_config=PROJECT_CONFIG,
profile_config=PROFILE_CONFIG,
render_config=RenderConfig(select=["path:snapshots"],
)
models = DbtTaskGroup(
task_id="models",
project_config=PROJECT_CONFIG,
profile_config=PROFILE_CONFIG,
render_config=RenderConfig(select=["path:models"],
)
post = EmptyOperator(task_id="post_dbt")
pre >> snapshots >> models >> post
Note: Failure to use separate DbtTaskGroup’s for snapshots and models runs will result in Snapshots and Models running out of order.
Authentication
There are two patterns available with Cosmos as described here:
Cosmos Profiles
While stumbling through Key Takeaway 1: Execution Modes and Environment Variables, I had initially tried to avoid environment variables altogether by switching to Cosmos’ Profiles.
Attempts to use the Profile didn’t go well with the Private Key authentication initially – more info on this in SnowflakeEncryptedPrivateKeyPem.
The TL;DR is if you’re getting deserialization errors like below. Then update your package version or just maintain your own profile.
[2023-11-02, 23:03:11 UTC] {subprocess.py:94} INFO - Database Error
[2023-11-02, 23:03:11 UTC] {subprocess.py:94} INFO - ('Could not deserialize key data. The data may be in an incorrect format, it may be encrypted with an unsupported algorithm, or it may be an unsupported key type (e.g. EC curves with explicit parameters).', [, ])
Realizing this took me validating that connections worked with 1) snowsql
CLI and 2) when writing the key value manually in the DAG as shown in the Full Sample DAG.
Key Takeaway 3: Use your own profiles.yml
It’s less setup. Otherwise you have to maintain an Airflow Connection – see SnowflakeEncryptedPrivateKeyPem.
We ended up writing the encrypted key content to a file during DAG runtime and providing the passphrase to our profiles.yml
via environment variables.
Key Takeaway 4: Provide the manifest.json
As our solution scaled up, we encountered significant import issues. This issue emerged quickly as we added more dbt projects and existing ones grew in complexity. Within a couple of days, import times increased drastically, reaching over an hour, which became a clear bottleneck.
Here’s what happened: unless specified, Cosmos determines how to process your dbt projects. With highly complex projects or a large number of them, the computational load shifts to the Airflow Scheduler during import time, leading to substantial delays.
Avoid Import Overhead Scaling Issues
The fix is straightforward: provide the dbt manifest.json. As part of your CI process, you should generate this manifest and include it in your CD pipeline. Doing so drastically reduces the CPU and memory overhead during import time because it spares the Airflow Scheduler from the heavy lifting of parsing the dbt projects.
To illustrate, consider these logs showing the import times for a basic Jaffle Shop setup:
With manifest.json provided:
File Path Runtime
------------------------------------------- ------------
/usr/local/airflow/dags/cosmos_dag.py 0.35s
/usr/local/airflow/dags/cosmos_dag.py 0.32s
/usr/local/airflow/dags/cosmos_dag.py 0.26s
File Path Runtime
------------------------------------------- ------------
/usr/local/airflow/dags/cosmos_dag.py 6.53s
/usr/local/airflow/dags/cosmos_dag.py 5.82s
/usr/local/airflow/dags/cosmos_dag.py 5.72s
Full Sample DAG
Here is the full DAG we settled on in generic form. It can be used as a starting point for your new dbt project or with the bespoke jaffle_shop dbt project.
In order for this DAG to work, it expects the dbt project directory to be named jaffle_shop
and a profiles.yml
file like this Sample Profile and these Requirements.
Note: It pulls secrets from Airflow variables. While the Airflow UI masks values with variables as named, if permissions allow, Airflow UI users could export those sensitive values.
import os
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ExecutionConfig, ExecutionMode, ProjectConfig, ProfileConfig, RenderConfig, LoadMode
DBT_PROJECT_FOLDER_NAME="jaffle_shop"
DAG_ID = os.path.basename(__file__).replace(".py", "")
ENV = Variable.get("environment", default_var="sandbox")
DEFAULT_ARGS = {
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
}
VIRTUAL_ENV_PACKAGES = [
"dbt-core==1.5.2",
"dbt-snowflake==1.5.1",
]
DBT_PROJECT_DIR = f"/usr/local/airflow/dags/dbt/models/{DBT_PROJECT_FOLDER_NAME}"
DBT_PROFILES_DIR = f"https://i0.wp.com/www.phdata.io/usr/local/airflow/dags/dbt/models/{DBT_PROJECT_FOLDER_NAME}/profiles.yml"
PROJECT_CONFIG = ProjectConfig(
dbt_project_path=DBT_PROJECT_DIR,
models_relative_path="models",
seeds_relative_path="seeds",
snapshots_relative_path="snapshots",
manifest_path=f"{DBT_PROJECT_DIR}/target/manifest.json",
)
PRIVATE_KEY_PASSPHRASE = Variable.get(f"{DBT_PROJECT_FOLDER_NAME}_{ENV}_private_key_passphrase", default_var=None)
ENCRYPTED_PRIVATE_KEY_CONTENT = Variable.get(f"{DBT_PROJECT_FOLDER_NAME}_{ENV}_encrypted_private_key", default_var=None)
PROFILE_CONFIG = ProfileConfig(
profile_name="project_profile", target_name=f"{ENV}_pk", profiles_yml_filepath=DBT_PROFILES_DIR
)
EXECUTION_CONFIG = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
OPERATOR_ARGS = {
"py_system_site_packages": False,
"py_requirements": VIRTUAL_ENV_PACKAGES,
"install_deps": False,
"env": {"PRIVATE_KEY_PASSPHRASE": PRIVATE_KEY_PASSPHRASE},
}
os.environ["PRIVATE_KEY_PASSPHRASE"] = PRIVATE_KEY_PASSPHRASE
@dag(
dag_id=DAG_ID,
description=f"DAG to run dbt models for {DBT_PROJECT_FOLDER_NAME}",
start_date=datetime(2023, 10, 21),
schedule_interval=None,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
max_active_runs=1,
concurrency=5,
catchup=False,
tags=["dbt", DBT_PROJECT_FOLDER_NAME, "SNOWFLAKE"],
)
def run():
with open("https://i0.wp.com/www.phdata.io/tmp/rsa_key.p8", "w") as f:
f.write(ENCRYPTED_PRIVATE_KEY_CONTENT)
snapshots = DbtTaskGroup(
group_id="snapshots",
project_config=PROJECT_CONFIG,
profile_config=PROFILE_CONFIG,
execution_config=EXECUTION_CONFIG,
operator_args=OPERATOR_ARGS,
render_config=RenderConfig(
select = ["path:snapshots"],
dbt_deps=False,
load_method=LoadMode.DBT_MANIFEST,
),
)
models = DbtTaskGroup(
group_id="models",
project_config=PROJECT_CONFIG,
profile_config=PROFILE_CONFIG,
execution_config=EXECUTION_CONFIG,
operator_args=OPERATOR_ARGS,
render_config=RenderConfig(
select = ["path:models"],
dbt_deps=False,
load_method=LoadMode.DBT_MANIFEST,
),
)
task_start = EmptyOperator(task_id="start")
task_end = EmptyOperator(task_id="end")
task_start >> snapshots >> models >> task_end
run()
Sample Profile
project_profile:
target: sandbox_pk
outputs:
sandbox_pk:
user:
private_key_path: /tmp/rsa_key.p8
role:
database:
warehouse:
schema:
query_tag: sa-dbt-project-template
private_key_passphrase: '{{ env_var("PRIVATE_KEY_PASSPHRASE") }}'
type: snowflake
account:
client_session_keep_alive: False
reuse_connections: True
threads: 1
SnowflakeEncryptedPrivateKeyPemProfileMapping
The Cosmos documentation indicates the availability of this object for use with a key path and passphrase, but it does not exist in the package version I installed. I have since learned that it is present in version 1.3.0a1
, which pip won’t install unless you explicitly pin to it.
My quick test demonstrates that it does work, although it doesn’t clean up the DAG code, and you still have to write the private key file. The one potential benefit is that you don’t have to maintain the dbt profiles.yml
file.
Below is the ProfileMapping
snip if you’re interested – this assumes an Airflow Connection named snowflake
exists:
from cosmos.profiles import SnowflakeEncryptedPrivateKeyPemProfileMapping
profile_mapping = SnowflakeEncryptedPrivateKeyPemProfileMapping(
conn_id = 'snowflake',
profile_args = {
"account": ,
"user": ,
"database": ,
"warehouse": ,
"schema": ,
"role": ,
"private_key_path": "https://i0.wp.com/www.phdata.io/tmp/rsa_key.p8",
"private_key_passphrase": PRIVATE_KEY_PASSPHRASE,
},
)
PROFILE_CONFIG = ProfileConfig(
# profile_name="project_profile", target_name=f"{ENV}_pk", profiles_yml_filepath=DBT_PROFILES_DIR
profile_name="project_profile", target_name="my_target_name", profile_mapping=profile_mapping
)
@dag(
<...>
)
def run():
with open("https://i0.wp.com/www.phdata.io/tmp/rsa_key.p8", "w") as f:
f.write(ENCRYPTED_PRIVATE_KEY_CONTENT)
<...>
run()
Conclusion
In conclusion, this guide has walked you through the seamless integration of the Cosmos DbtTaskGroup
object, empowering you to execute dbt snapshots, models, and various dbt commands within the Airflow framework.
If you have any questions or want to share your experiences, don’t hesitate to reach out to the experts at phData!. Your journey with Cosmos and dbt workflows is just beginning!
Need additional help running dbt snapshots, models, and other dbt commands in Cosmos? Explore phData’s dbt consulting services today!