December 11, 2023

How to Utilize Astronomer Cosmos to Streamline dbt Workflows

By Mike Menne

This blog serves as a helpful starting point to explore the best practices for utilizing Astronomer Cosmos to gain better observability and streamline your dbt workflows.

While I had dabbled with dbt in the past, this project marked my first real exploration with my project team, and I opted to leverage Cosmos.

The Astronomer Cosmos package emerged from a collaboration between Airflow and dbt experts to offer an insightful and feature-rich approach to running dbt projects on your Airflow platform.

This blog will demonstrate how we utilized the Cosmos DbtTaskGroup object to execute dbt snapshots, models, and other dbt commands in Airflow. Additionally, it will dive into some key learnings encountered during the implementation with AWS MWAA. Our focus was on connecting to the Snowflake Data Cloud, and authentication was achieved via encrypted private keys.

Summary

In summary, Cosmos proves to be robust, but it does require some time investment to establish a workable DAG (Directed Acyclic Graph) due to immature documentation. While the astronomer/cosmos-example repository offers more complete examples, most assume the usage of an Airflow Connection, a dependency we will not leverage here.

Quoting Cosmos from the product page:

“Cosmos is the easiest and most powerful way to integrate dbt + Airflow”

Based on my experience with dbt, I would agree, but I would add “…once you navigate through the Cosmos docs a few times.”

Astronomer Cosmos Resources

Before getting started, here are the resources I used for development:

  1. Astronomer Cosmos docs – These are the official docs. They’re ok.

  2. GitHub – astronomer/astronomer-cosmos – The project is young, so the docs being ok is somewhat expected. Referencing source code was necessary.

  3. Astro CLI – This was instrumental for quick testing, and I cannot recommend it more.

  4. GitHub – dbt-labs/jaffle-shop: An open-source sandbox project exploring dbt workflows via a fictional sandwich shop’s data – it is bespoke.

Requirements and Set Up

Astro CLI

The Astro CLI provided my first experience with Airflow, where creating a DAG felt truly agile. It was a game-changer, allowing me to easily mimic installed packages of MWAA instances, change Airflow versions, and get near-instant feedback for DAG import issues.

To install the Astro CLI, follow the official documentation and have the Docker Desktop installed.

After Astro CLI is installed you can walk through this – Get started with Airflow using the Astro CLI | Astronomer Documentation, but here are the specific commands.

First, make sure Docker Desktop is running then…

				
					astro dev init
				
			

Modify the requirements.txt file with the following packages:

				
					dbt-core == 1.5.2
dbt-snowflake == 1.5.1
astronomer-cosmos==1.1.1

				
			

Then start Astro:

				
					astro dev start
				
			

Snowflake Account

If you don’t have a Snowflake account, go grab a free one. You’ll need a private key for authentication. Use the following commands to quickly generate the keys:

				
					openssl genrsa 2048 | openssl pkcs8 -v2 des3 -topk8 -inform PEM -out rsa_key.p8 -passout pass:mypassphrase
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub -passin pass:mypassphrase
cat rsa_key.pub
				
			

For more details on setting up the private key, refer to Key Pair Authentication & Key Pair Rotation | Snowflake Documentation

dbt Core Project

I’d recommend cloning Jaffle Shop – GitHub – dbt-labs/jaffle-shop: An open source sandbox project exploring dbt workflows via a fictional sandwich shop’s data – but you can certainly use your project.

Make sure to see the Sample Profile and add that to your project.

Key Takeaways

Cosmos DAG Configuration

We leveraged DbtTaskGroup for everything. It allows you to run dbt models and other dbt commands in Airflow while gaining insights into your dbt project via Airflow graphs and task logs.

Initially, we decided to rely on the Getting Started on MWAA — Astronomer Cosmos documentation, but lack of a concrete example led us to the Full Sample DAG.  

With one exception: we initially used ExecutionMode.VIRTUALENV.

ExecutionMode.VIRTUALENV vs. LOCAL

The DbtTaskGroup spun up and tore down a temporary Python virtual environment per task (i.e., per dbt model). However, after introducing multiple dbt projects, each with many dags and tasks, we observed MWAA disk space issues.

Whether this was an issue with Cosmos or us running too much in parallel, I cannot say. However, I didn’t encounter any issues locally. Our solution was to use ExecutionMode.LOCAL.

Key Take Away 1: Execution Modes and Environment Variables

The documentation suggests injecting environment variables using operator_args. This works when running ExecutionMode.VIRTUALENV but not in LOCAL mode. The operator accepts the args in LOCAL mode, but the DAG will encounter an import error:

				
					Parsing Error
  Env var required but not provided: 'PRIVATE_KEY_PASSPHRASE'
				
			

After some stumbling, we realized that setting os.environ["PRIVATE_KEY_PASSPHRASE"] = PRIVATE_KEY_PASSPHRASE was the answer.

VIRTUALENV might be great if you have varying requirements and need teams to be in control. However, if you’re running on MWAA or other Airflow platforms, you might experience disk space issues with all the temporary virtual environments.

Cosmos DAG Structure

As a general guide, we learned to structure our DAGs as follows:

  1. EmptyOperator for task starts

  2. DbtTaskGroup for snapshots (optional) 

  3. DbtTaskGroup for models (add other groups as needed such as for macros)

  4. EmptyOperator for task ends

That is to say, if you need anything coordinated aside from a dbt run  and dbt test commands, you should create and use more than one DbtTaskGroup.

Key Takeaway 2: Snapshot and Model Pattern

Astronomer Cosmos uses dbt lineage to determine what models to run and break them into Airflow tasks. As with dbt CLI, snapshots are separate. To control running snapshots first and models after, we just had to use two DbtTaskGroup’s.

				
					pre = EmptyOperator(task_id="pre_dbt")
snapshots = DbtTaskGroup(
    task_id="snapshots",
    project_config=PROJECT_CONFIG,
    profile_config=PROFILE_CONFIG,
    render_config=RenderConfig(select=["path:snapshots"],
)

models = DbtTaskGroup(
    task_id="models",
    project_config=PROJECT_CONFIG,
    profile_config=PROFILE_CONFIG,
    render_config=RenderConfig(select=["path:models"],
)

post = EmptyOperator(task_id="post_dbt")

pre >> snapshots >> models >> post

				
			

Note: Failure to use separate DbtTaskGroup’s for snapshots and models runs will result in Snapshots and Models running out of order.

Authentication

There are two patterns available with Cosmos as described here:

Cosmos Profiles

While stumbling through Key Takeaway 1: Execution Modes and Environment Variables, I had initially tried to avoid environment variables altogether by switching to Cosmos’ Profiles.

Attempts to use the Profile didn’t go well with the Private Key authentication initially – more info on this in SnowflakeEncryptedPrivateKeyPem.

The TL;DR is if you’re getting deserialization errors like below. Then update your package version or just maintain your own profile. 

				
					[2023-11-02, 23:03:11 UTC] {subprocess.py:94} INFO - Database Error
[2023-11-02, 23:03:11 UTC] {subprocess.py:94} INFO - ('Could not deserialize key data. The data may be in an incorrect format, it may be encrypted with an unsupported algorithm, or it may be an unsupported key type (e.g. EC curves with explicit parameters).', [<OpenSSLError(code=109052072, lib=13, reason=168, reason_text=wrong tag)>, <OpenSSLError(code=109576458, lib=13, reason=524554, reason_text=nested asn1 error)>])
				
			

Realizing this took me validating that connections worked with 1) snowsql CLI and 2) when writing the key value manually in the DAG as shown in the Full Sample DAG.

Key Takeaway 3: Use your own profiles.yml

It’s less setup. Otherwise you have to maintain an Airflow Connection – see SnowflakeEncryptedPrivateKeyPem.

We ended up writing the encrypted key content to a file during DAG runtime and providing the passphrase to our profiles.yml via environment variables.

Full Sample DAG

Here is the full DAG we settled on in generic form. It can be used as a starting point for your new dbt project or with the bespoke jaffle_shop dbt project.

In order for this DAG to work, it expects the dbt project directory to be named jaffle_shop and a profiles.yml file like this Sample Profile and these Requirements.

Note: It pulls secrets from Airflow variables. While the Airflow UI masks values with variables as named, if permissions allow, Airflow UI users could export those sensitive values.

				
					import os
from datetime import datetime, timedelta

from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ExecutionConfig, ExecutionMode, ProjectConfig, ProfileConfig, RenderConfig
# from cosmos.constants import TestBehavior


DBT_PROJECT_FOLDER_NAME="jaffle_shop"
DAG_ID = os.path.basename(__file__).replace(".py", "")
ENV = Variable.get("environment", default_var="sandbox")
DEFAULT_ARGS = {
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
}
VIRTUAL_ENV_PACKAGES = [
    "dbt-core==1.5.2",
    "dbt-snowflake==1.5.1",
]
DBT_PROJECT_DIR = f"/usr/local/airflow/dags/dbt/models/{DBT_PROJECT_FOLDER_NAME}"
DBT_PROFILES_DIR = f"https://i0.wp.com/www.phdata.io/usr/local/airflow/dags/dbt/models/{DBT_PROJECT_FOLDER_NAME}/profiles.yml"
PROJECT_CONFIG = ProjectConfig(
    dbt_project_path=DBT_PROJECT_DIR,
    models_relative_path="models",
    seeds_relative_path="seeds",
    snapshots_relative_path="snapshots",
)
PRIVATE_KEY_PASSPHRASE = Variable.get(f"{DBT_PROJECT_FOLDER_NAME}_{ENV}_private_key_passphrase", default_var=None)
ENCRYPTED_PRIVATE_KEY_CONTENT = Variable.get(f"{DBT_PROJECT_FOLDER_NAME}_{ENV}_encrypted_private_key", default_var=None)
PROFILE_CONFIG = ProfileConfig(
    profile_name="project_profile", target_name=f"{ENV}_pk", profiles_yml_filepath=DBT_PROFILES_DIR
)

EXECUTION_CONFIG = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
OPERATOR_ARGS = {
            "py_system_site_packages": False,
            "py_requirements": VIRTUAL_ENV_PACKAGES,
            "install_deps": False,
            "env": {"PRIVATE_KEY_PASSPHRASE": PRIVATE_KEY_PASSPHRASE},
}

# workaround for Cosmos Issue [https://github.com/astronomer/astronomer-cosmos/issues/646](https://github.com/astronomer/astronomer-cosmos/issues/646)
os.environ["PRIVATE_KEY_PASSPHRASE"] = PRIVATE_KEY_PASSPHRASE

@dag(
    dag_id=DAG_ID,
    description=f"DAG to run dbt models for {DBT_PROJECT_FOLDER_NAME}",
    start_date=datetime(2023, 10, 21),
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    max_active_runs=1,
    concurrency=5,
    catchup=False,
    tags=["dbt", DBT_PROJECT_FOLDER_NAME, "SNOWFLAKE"],
)
def run():
    with open("https://i0.wp.com/www.phdata.io/tmp/rsa_key.p8", "w") as f:
        f.write(ENCRYPTED_PRIVATE_KEY_CONTENT)

    snapshots = DbtTaskGroup(
        group_id="snapshots",
        project_config=PROJECT_CONFIG,
        profile_config=PROFILE_CONFIG,
        execution_config=EXECUTION_CONFIG,
        operator_args=OPERATOR_ARGS,
        render_config=RenderConfig(
            select = ["path:snapshots"],
            # test_behavior=TestBehavior.NONE,
            dbt_deps=False
        ),
    )
    models = DbtTaskGroup(
        group_id="models",
        project_config=PROJECT_CONFIG,
        profile_config=PROFILE_CONFIG,
        execution_config=EXECUTION_CONFIG,
        operator_args=OPERATOR_ARGS,
        render_config=RenderConfig(
            select = ["path:models"],
            # test_behavior=TestBehavior.NONE,
            dbt_deps=False
        ),
    )

    task_start = EmptyOperator(task_id="start")

    task_end = EmptyOperator(task_id="end")

    task_start >> snapshots >> models >> task_end


run() 

				
			

Sample Profile

				
					project_profile:
  target: sandbox_pk
  outputs:
    sandbox_pk:
      user: <user>
      private_key_path: /tmp/rsa_key.p8
      role: <role>
      database: <database>
      warehouse: <warehouse>
      schema: <schema>
      query_tag: sa-dbt-project-template
      private_key_passphrase: '{{ env_var("PRIVATE_KEY_PASSPHRASE") }}'

      type: snowflake
      account: <account>
      client_session_keep_alive: False
      reuse_connections: True
      threads: 1
				
			

SnowflakeEncryptedPrivateKeyPemProfileMapping

The Cosmos documentation indicates the availability of this object for use with a key path and passphrase, but it does not exist in the package version I installed. I have since learned that it is present in version 1.3.0a1, which pip won’t install unless you explicitly pin to it. 

My quick test demonstrates that it does work, although it doesn’t clean up the DAG code, and you still have to write the private key file. The one potential benefit is that you don’t have to maintain the dbt profiles.yml file.

Below is the ProfileMapping snip if you’re interested – this assumes an Airflow Connection named snowflake exists:

				
					from cosmos.profiles import SnowflakeEncryptedPrivateKeyPemProfileMapping


profile_mapping = SnowflakeEncryptedPrivateKeyPemProfileMapping(
    conn_id = 'snowflake',
    profile_args = {
        "account": <account>,
        "user": <user>,
        "database": <database>,
        "warehouse": <warehouse>,
        "schema": <schema>,
        "role": <role>,
        "private_key_path": "https://i0.wp.com/www.phdata.io/tmp/rsa_key.p8",
        "private_key_passphrase": PRIVATE_KEY_PASSPHRASE,
    },
)
PROFILE_CONFIG = ProfileConfig(
    # profile_name="project_profile", target_name=f"{ENV}_pk", profiles_yml_filepath=DBT_PROFILES_DIR
    profile_name="project_profile", target_name="my_target_name", profile_mapping=profile_mapping
)


@dag(
    <...>
)
def run():
    with open("https://i0.wp.com/www.phdata.io/tmp/rsa_key.p8", "w") as f:
        f.write(ENCRYPTED_PRIVATE_KEY_CONTENT)


    <...>


run()


				
			

Conclusion

In conclusion, this guide has walked you through the seamless integration of the Cosmos DbtTaskGroup object, empowering you to execute dbt snapshots, models, and various dbt commands within the Airflow framework.

If you have any questions or want to share your experiences, don’t hesitate to reach out to the experts at phData!. Your journey with Cosmos and dbt workflows is just beginning!

Need additional help running dbt snapshots, models, and other dbt commands in Cosmos? Explore phData’s dbt consulting services today!

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit