May 22, 2023

How to Git Version Control dbt Cloud Jobs

By Arnab Mondal

This blog was co-written by Arnab Mondal & David Beyer.

dbt has significantly changed the integrated developer environment and how we transform data with the launch of dbt Cloud. This next-generation service provides a managed infrastructure and development environment which streamlines dbt work and orchestration, simplifying the process of working with dbt on-premise.

With the power of the dbt Cloud, customers can get straight into creating and executing models seamlessly. 

Additional key added features include dbt jobs and a scheduler that enables the creation of dbt jobs that execute dbt commands ad-hoc or at a fixed interval. This revolutionized how dbt jobs are maintained and is one of the most well-received features of dbt Cloud. 

In this article, we will discuss how to take this powerful feature and make it even better!

Why is Git Versioning so Critical?

Imagine that everything is running smoothly in production and all other environments. Still, it is discovered that one service is not working as expected, and there is a need to fix the issue as soon as possible. Wouldn’t it be helpful if you had a record of the code and all revisions from a historical point of view? 

This may be achieved by executing manual backups or using Git to version your code, which enables simple rollback capability and allows your code to have automatic backups at all times. 

This automatic backup and rollback feature is why we use the service, and GitHub, Bitbucket, and more organizations provide us with the online capability necessary to achieve this goal.

dbt Cloud allows you to connect to the repository for the Integrated Development Environment (IDE), but does not enable this feature by default. Below we will discuss how to utilize dbt Cloud’s Application Programming Interface (API) as a workaround that allows this powerful connection.

How to Get Started With dbt Cloud API

The dbt Cloud REST API is not available for trial accounts. In order to utilize this feature, you will need a dbt Cloud license as well as being added as a member of the following permission set to get an API key: 

  • Account Admin
  • Project Creator
  • Admin
  • Job Admin
  • Developer

Fetching the API Key

To get started, you first need to log in to dbt Cloud and click on the top right Cog Wheel icon which represents the dbt Cloud settings. You will then be redirected to the profile page of your account. 

Next, scroll down to the bottom of the page to find the API key. If you’re not seeing any API key, you’ll need to contact your admin to give you sufficient privileges to access the API key.

Validating the API Key

Once you have an API key, you can review the dbt Cloud documentation of the API to understand more details regarding the data available through the use of the API. The code we will use here is Python, but you can use your programming language of choice (assuming compatibility) to call the API. 

Code to Initialize the API Header:

				
					import requests
import json

headersAuth = {
    'Authorization': 'Bearer '+ str(‘<your_api_key>’),
}
				
			

Code to Get All Account Details:

				
					response = requests.get('https://cloud.getdbt.com/api/v2/accounts/', headers=headersAuth)
res=response.json()
print(json.dumps(res,indent=4))

Output:
{
    "status": {
        "code": 200,
        "is_success": true,
        "user_message": "Success!",
        "developer_message": ""
    },
    "data": [
        {
            "docs_job_id": null,
            "freshness_job_id": null,
            "id": 1, ← Account ID
            "name": "dbt_Jobs_Demo",
            "state": 1,
            "plan": "enterprise",
            …
        }
    ],
    "extra": {
        "filters": {
            "pk__in": [
                1, ← Account ID
            ]
        },
        "order_by": null,
        "pagination": {
            "count": 1,
            "total_count": 1
        }
    }
}

				
			

Once the API is authenticated with the key and we have verified it is working properly, we can then proceed to the next steps. Before jumping into our primary discussion, we must familiarize ourselves with the API and some specific functionalities. 

In particular, we will want to understand the Create, Read, Update, and Delete (CRUD) operations and how to perform them.

Collecting Prerequisite Information Needed to Create a Job

To create a dbt Cloud job, we need three IDs that we can easily collect from the API. One is the Account ID (which we demonstrated how to fetch in the previous step), but now we need to fetch the specific Project ID and Environment ID for the location where we intend to create our job. 

These details could also be retrieved from the dbt User Interface (UI), but the result is the same when completing the operation via an API endpoint. Once you have the Account_ID, you can get the Project_ID by listing all the projects as demonstrated below.

Code to Get All Account Details:

				
					response = requests.get('https://cloud.getdbt.com/api/v2/accounts/', headers=headersAuth)
res=response.json()
print(json.dumps(res,indent=4))

Output:
{
  "data": [
    {
      "id": 100,
      "account_id": 1,
      "connection": {
        "id": 0,
        "account_id": 0,
        "project_id": 0,
        "name": "string",
        "type": "postgres",
        "state": 0,
        "created_by_id": 0,
        "created_by_service_token_id": 0,
        "created_at": "2019-08-24T14:15:22Z",
        "updated_at": "2019-08-24T14:15:22Z",
        "details": {
          "project_id": "string",
          "timeout_seconds": 0,
          "private_key_id": "string",
          "private_key": "string",
          "client_email": "string",
          "client_id": "string",
          "auth_uri": "string",
          "token_uri": "string",
          "auth_provider_x509_cert_url": "string",
          "client_x509_cert_url": "string"
        }
      },
      "connection_id": 5000,
      "dbt_project_subdirectory": "analytics/dbt-models",
      "name": "Analytics",
      "repository": {
        "id": 200,
        "account_id": 1,
        "remote_url": "git@github.com:fishtown-analytics/jaffle_shop.git",
        "remote_backend": "string",
        "git_clone_strategy": "azure_active_directory_app",
        "deploy_key_id": 0,
        "github_installation_id": 0,
        "state": 0,
        "created_at": "2023-08-24T14:15:22Z",
        "updated_at": "2023-08-24T14:15:22Z"
      },
      "repository_id": 1,
      "state": 0,
      "created_at": "2023-08-24T14:15:22Z",
      "updated_at": "2023-08-24T14:15:22Z"
    }
  ],
  "status": {
    "code": 200,
    "is_success": true,
    "user_message": "string",
    "developer_message": "string"
  }
}

				
			

BONUS phData CODE! Get Detail for a Specific Account:

				
					response = requests.get(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/projects/', headers=headersAuth)

res=response.json()
project_id=0
for i in res["data"]:
    print(i["name"])
    if i["name"]=="<Project Name you are looking for>":
        project_id=i["id"]
        print(json.dumps(i,indent=4))



Output:

{
  "name": "<Project Name you are looking for>",
  "account_id": 1,
  "connection_id": 1,
  "repository_id": 1,
  "id": 1,
  "created_at": "2022-10-05 14:25:12.947824+00:00",
  "updated_at": "2023-03-03 20:11:37.787216+00:00",
  "skipped_setup": true,
  "state": 1,
  "dbt_project_subdirectory": null,
  "connection": {
    "id": 1,
    "account_id": 1,
    "project_id": 1,
    "name": "DEMO_dbt_JOBS",
    "type": "snowflake",
    "created_by_id": 1,
    "created_by_service_token_id": null,
    "details": {
      "account": "111111.us-east-1",
      "database": "DEMO_DB",
      "warehouse": "DEmo_WH",
      "allow_sso": true,
      "client_session_keep_alive": false,
      "role": "DEV_DB"
    },
    "state": 1,
    "created_at": "2022-10-05 20:55:37.172108+00:00",
    "updated_at": "2023-02-16 05:46:20.865982+00:00",
    "private_link_endpoint_id": null
  },
  "repository": {
    "id": 1,
    "account_id": 1,
    "project_id": 1,
    "full_name": "DEMO_DB",
    "remote_url": "https://DEMO_DB",
    "remote_backend": "azure_active_directory",
    "git_clone_strategy": "azure_active_directory_app",
    "deploy_key_id": null,
    "repository_credentials_id": null,
    "github_installation_id": null,
    "pull_request_url_template": "https://dev.azure.com/DEMO_DB",
    "state": 1,
    "created_at": "2023-03-03 20:11:37.324019+00:00",
    "updated_at": "2023-03-03 20:11:37.324043+00:00",
    "deploy_key": null,
    "github_repo": null,
    "name": "DEMO_DB",
    "git_provider_id": 1,
    "gitlab": null,
    "git_provider": null
  },
  "group_permissions": [],
  "docs_job_id": null,
  "freshness_job_id": null,
  "docs_job": null,
  "freshness_job": null
}
				
			

At this point, we have built out code as a functional wrapper on top of the publicly available endpoints and added proper authentication, error handling, and more concepts to ensure everything is production-ready as this solution is utilized for an active data warehouse.

Understanding How to Execute CRUD Operations on a dbt Cloud Job

Next, we will review the steps necessary to perform the basic CRUD (CREATE, READ, UPDATE, DELETE) operations of a dbt Cloud job using the API. We will begin by creating a job by utilizing a POST operation with the following body as JSON: 

Creating a dbt Cloud Job

				
					body={
        "execution": {
            "timeout_seconds": 0
        },
        "generate_docs": False,
        "run_generate_sources": False,
        "id": None,
        "account_id": 1,
        "project_id": 1,
        "environment_id": 1,
        "name": "TEST_DEMO_CODE",
        "dbt_version": None,
        "execute_steps": [
            "dbt build --select demo_model"
        ],
        "state": 1,
        "deactivated": False,
        "run_failure_count": 0,
        "deferring_job_definition_id": None,
        "triggers": {
            "github_webhook": False,
            "git_provider_webhook": False,
            "custom_branch_only": True,
            "schedule": False
        },
        "settings": {
            "threads": 4,
            "target_name": "Test"
        },
        "schedule": {
            "cron": "0 0 * */3 MON",
            "date": {
                "type": "custom_cron",
                "cron": "0 0 * */3 MON"
            },
            "time": {
                "type": "every_hour",
                "interval": 1
            }
        }
}


				
			

*Note: The ID of the Job should be NULL while creating it. This appears as a “None” value in the Python programming language. If you are using a different language, be sure to verify the NULL equivalent of that specific programming language.

Next, you need to send the above body in a POST request like this: 

				
					response = requests.post(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/', headers=headersAuth, json=body)

Output:
(On Success)

{
    "status": {
        "code": 201,
        "is_success": true,
        "user_message": "Success!",
        "developer_message": ""
    },
    "data": {
        "execution": {
            "timeout_seconds": 0
        },
        "generate_docs": false,
        "run_generate_sources": false,
        "id": <You Will get a Job_ID created just now>,
        "account_id": 1,
        "project_id": 1,
        "environment_id": 1,
        "name": "TEST_DEMO_CODE",
        "dbt_version": null,
        "raw_dbt_version": null,
        "created_at": "2023-02-27T07:51:03.689278+00:00",
        "updated_at": "2023-02-27T07:51:03.689300+00:00",
        "execute_steps": [
            "dbt build --select demo_model"
        ],
        "state": 1,
        "deactivated": false,
        "run_failure_count": 0,
        "deferring_job_definition_id": null,
        "lifecycle_webhooks": false,
        "lifecycle_webhooks_url": null,
        "triggers": {
            "github_webhook": false,
            "git_provider_webhook": false,
            "custom_branch_only": true,
            "schedule": false
        },
        "settings": {
            "threads": 4,
            "target_name": "Test"
        },
        "schedule": {
            "cron": "0 0 * */3 MON",
            "date": {
                "type": "custom_cron",
                "cron": "0 0 * */3 MON"
            },
            "time": {
                "type": "every_hour",
                "interval": 1
            }
        },
        "is_deferrable": false,
        "generate_sources": false,
        "cron_humanized": "At 00:00 AM, only on Monday, every 3 months",
        "next_run": null,
        "next_run_humanized": null
    }
}
				
			

*Note: You will need to store the Job ID from the response if you intend to perform any future operations like Update and Delete.

BONUS phData CODE! Function to Store Project ID:

				
					def create_job(body):
    response = requests.post(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/', headers=headersAuth, json=body)

    print(response)
    data=response.json()

    job_id=data["data"]["id"]
    print(“The Job ID is: ”,job_id)


    return job_id

Output: 

<Response [201]>
The Job ID is: 1437834341 

				
			

Reading the dbt Cloud Job Created

With the Job ID that was created in the previous step, you can now perform multiple operations such as running a job, checking the status of a job, getting the artifacts from a job, or even fetching the job configurations by making an API call. 

This part will be useful when we dive into the job versioning part, but we will not bring one job at a time as that would increase the API calls.

You can use the following code to fetch the details of the job you recently created: 

				
					requests.get(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/', headers=headersAuth)
				
			

Updating the dbt Cloud Job Created

To update the dbt Cloud job that was created, you need to complete a POST operation, which is similar to the creation of a job, but this time you need to pass the Job ID in the JSON payload. 

You can use the previous JSON body and just add the appropriate changes. Make sure to change the specific Job ID in the body,  or you will get an error message. You can then use the following endpoint to update a job:

				
					requests.post(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{id}/', headers=headersAuth,json=body_update)

				
			

Deleting the dbt Cloud Job Created

You can also delete the job you made with the following endpoint: 

				
					requests.delete(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{id}/', headers=headersAuth)


				
			

EXTRA: Executing the dbt Cloud Job Created

We reviewed all of the CRUD operations, but dbt Cloud API also has an endpoint allowing execution of the job and management of the artifacts. You can use the following endpoint to execute or trigger a job via a post-operation:

				
					requests.post(f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/', headers=headersAuth,json=cause)




				
			

Here the post operation is done because you need to send a reason to execute the job which is a required parameter.

You can put the reason in a JSON object like this: 

				
					cause={
        "cause": "Kicked off from dbt Cloud API by Arnab"
}





				
			

Git Versioning the dbt Cloud Jobs

Now that you have a basic understanding of how the API works, we will explore how we can use the API to our advantage for version controlling dbt Cloud jobs. You can use any repository you like, as there are no restrictions, and we’ll review the necessary steps below.

Creating a Repository

The first step is to create a repository which may be on your platform of choice, as mentioned earlier. It is critical to ensure that dbt Cloud jobs are only accessed from the repository, and manual changes to the code should be avoided as much as possible. 

It may not be possible to circumvent all manual code changes, but keeping these to a minimum will ensure that the Git versioning provides the capability we want to achieve. This is important as right now dbt Cloud has no UI Lock option nor does it have a feature preventing code changes via the UI (no option to require changes to be made via the API only). 

How Does the Code Work? 

We have implemented the code to be able to execute in three different modes, with the default being the “Difference mode.” Next, we will review the three modes being implemented in the code.

Difference Mode

If you run the code without any parameters, it will run in Difference mode. Here it compares the local file with the online jobs that exist in dbt. You could also provide a file as an input to compare against, or dbt will take ‘jobs_data.json’ as the default JSON file. You can pass another JSON file as the desired result utilizing the inline parameter to redirect the output.

The main purpose of this mode is to execute a daily run and generate log reports detailing the expected status of all online dbt jobs compared to their observed status when the code was executed. It will store results for all CRUD operations needed if anything changes in the dbt Cloud space.  

For example, we could analyze if a dbt job got deleted by mistake or if a job was updated to something else, and now we need to revert it. Using this mode, we can generate the report and see the explicit changes. 

Snapshot Mode

Running the code with ‘–snapshot’ mode will allow you to take a snapshot at that moment of all the online dbt jobs. You could also pass a JSON file where you would want the output to go, or, by default, the output will be written to the ‘jobs_data.json’ file. This mode will be helpful in gaining a better understanding of specific point-in-time behavior.

It is considered best practice to execute in this mode when you run the code for the first time. It will take a snapshot of the online system if there are already dbt jobs before implementing any new code.

Executing this mode initially also creates the default JSON file for the other modes to execute.  If you prefer not to utilize the default JSON file, you could avoid this step and provide your custom files.

Implementation Mode

Running the code with ‘–start’ mode will read the differences and implement the CRUD operations necessary. You could pass a JSON file in this mode indicating where the differences will be applied, or by default, the differences will be read from the ‘jobs_data.json’ file. 

Running in this mode will implement all the changes you found in the report when executing the code in the Difference mode. It will perform all CRUD operations on your behalf using the API calls discussed above.

The Code

I personally utilized Python to code out this specific solution, but readers are not limited to one language, and I encourage those who prefer other languages such as Javascript, Scala, etc., to utilize the language of their choice. 

To that end, we will break down the code into finite logical blocks that clearly demonstrate the required functionality (regardless of language) for each section of logic.

Step 1: Functions and Endpoints

We will need one more API endpoint for our purpose (apart from the CRUD operations endpoints), which is the endpoint used to fetch all jobs of an account: 

Endpoint: https://cloud.getdbt.com/api/v2/accounts/{accountId}/jobs/

Next, I would suggest a series of multiple, repetitive operations such as Creation, Update or Delete. If you are familiar with Object Oriented Programming (OOP) concepts, create a class for this purpose (ex: class dbt_JOBS():), but if you are not comfortable with object classes, another option is to use a simple function instead.

After you have created a class (or functions) for the CRUD operations and all iterative tasks that you will perform, you are ready to move on to Step 2.

Code : 

				
					if __name__=="__main__":

    """
    How to Use : 3 Modes

    Difference Mode :
    Run the Code without any parameters and it will run in difference mode. Here it compares the local file with the online jobs that exist in dbt. You could also provide a file as an input to compare against or by default it will take 'jobs_data.json' as the default json file.

    Snapshot Mode :
    Run the code with '--snapshot' mode to take a snapshot at that moment of all the online dbt jobs. You could also pass a json file where you would want the output to go or else by default the output will be written to 'jobs_data.json' file

    Implementation Mode :
    Run the code with '--start' mode to read the difference and implement the CRUD operations like Create/Read/Update/Delete. You could pass a json file in this mode from where the differences will be applied or else by default the differences will be read from the 'jobs_data.json' file    
    
    """

    if not os.path.exists("Log"):
        os.makedirs("Log")
    

    log_file= "Log/Run_" +time.strftime("%Y_%m_%d", time.localtime())+".log"
    
    # Create a logger object
    logger = logging.getLogger()

    # Set the log level
    logger.setLevel(logging.DEBUG)

    log_format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

    # Create a console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(logging.Formatter(log_format))

    # Create a file handler
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(logging.Formatter(log_format))

    # Add handlers to the logger
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)




    args=list(sys.argv)

    # Open the JSON file and load the data
    with open('config.json', 'r') as f:
        data = json.load(f)

    # Extract the values for the API_Key and Account_Name keys
    api_key = data[0]['API_Key']
    account_name = data[0]['Account_Name']

    headersAuth = {
        'Authorization': 'Bearer '+ str(api_key),
    }

    Lakehouse=dbt_JOBS(account_name,headersAuth,logger)
    
    #Default File Location
    file_loc="jobs_data.json"


				
			

Step 2: The Main Method 

You should generally start the Main method with the log handlers, creating and adding them first, or you could begin with some brief code documentation on the different modes of operation.

Next, read the API key and account name from a config file (utilizing an encrypted file is good practice) or by utilizing an online management resource such as AWS Secrets Manager if you plan to implement the solution on the web. After reading the keys and account ID from the config file, you can move on to the next step. 

The config file will look like this: 

				
					config.json
[
    {
        "API_Key":"<Your API Key Here>",
        "Account_Name":"<Your Account ID Here>"
    }
]

				
			

BONUS HINT!  You could either store the account ID in a numeric format in the config or pass the explicit name of the account and implement a function where the API would pull all accounts with the API key and find the account_ID, which has the name passed in the config file. Then the config file will look like this: 

				
					config.json
[
    {
        "API_Key":"<Your API Key Here>",
        "Account_Name":"<Your Account Name Here>"
    }

				
			

Step 3: The Versioning Logic

In this step, the specific logic for implementation is introduced, and we will examine each of the individual modes and how to implement them.

Difference Mode Logic 

In this mode, there should either be a default JSON file, or you should supply a JSON file with the dbt job configuration you want to exist. The code will fetch all jobs from the online account at that moment and compare each job with the JSON file passed. It will then generate a report which can be printed or stored in a log file with details like which job needs to be created, updated, and deleted.

Snapshot Mode Logic 

This mode will create a snapshot in time of all dbt Cloud jobs and store them in a JSON file. The default file will be the output unless another JSON file is passed as an inline argument while executing.

Implementation Mode Logic 

In this mode, the code implements changes such as Creation, Updating, or Deletion of dbt Cloud jobs. There are multiple options here because you might run both modes one after the other (provided nothing changed between the successive runs). You could either prevent login access to the web UI or run both in quick succession to avoid any undesired modifications between runs. You can use the code snippets discussed above to perform all the CRUD operations. 

Code : 

				
					for i in read_data:
            # Create New jobs
            if "id" not in i:
                logger.info("******************************")
                logger.info("Found a job to create :: ")
                i["id"]=None
                i["account_id"]=Lakehouse.getAccountID()
                i["project_id"]=Lakehouse.getProjectID(i["project_id"])
                i["environment_id"]=Lakehouse.getEnvID(i["environment_id"])
                logger.info(i)
                if implement:
                    Lakehouse.create_job(i)
                    no_of_created=no_of_created+1
                no_of_create=no_of_create+1
                # logger.info("job Created (id) : ",job_id)
            else:
                # Update existing Jobs
                id=i["id"]
                for j in res_data:
                    if id==j["id"]:
                        if i!=j:
                            logger.info("******************************")
                            logger.info("Job to be updated : ",i["name"])
                            logger.info("********** Old Job **********")
                            logger.info(j)
                            logger.info("********** New Job **********")
                            logger.info(i)
                            if implement:
                                Lakehouse.update_job_status(i)
                                no_of_updated=no_of_updated+1
                            no_of_update=no_of_update+1
                        else:
                            logger.info(" Jobs config is the same for : (Local) ",i["name"].ljust(50, ' '),j["name"].rjust(50, ' '),"::(Online) ")

        
        no_of_delete=0
        no_of_deleted=0
        # Find Deleted Jobs
        for i in res_data:
            found=0
            for j in read_data:
                if i["name"]==j["name"]:
                    found=1
                    break
            if found==0:
                logger.info(" ******************************")
                logger.info(" Job will be deleted : ",i["name"])
                no_of_delete=no_of_delete+1
                if implement:
                    Lakehouse.delete_job_by_name(i["name"])
                    no_of_deleted=no_of_deleted+1



				
			

MORE BONUS TIPS! 

This logic searches the two data structures in O(n^2) time, which is suitable for small use cases.  For larger production use cases with a significant number of dbt cloud jobs (), utilizing hashing or a dictionary format will greatly improve performance. You could use other data structures and developed algorithms to compare and check efficiently. 

The above code is meant to serve as a framework or “Version 1” of the code created. All developed algorithms and improvements to coding efficiency are implemented before deploying it in a customer’s production environment (i.e., Version 1 is not deployed directly).

Conclusion

You should now have a better understanding of how to version control your dbt Cloud jobs and even use something similar like Cloud Formation templates to create, update, and delete dbt jobs. 

The code and logic shared in this article (after customization and improvements) is currently being utilized in an active customer production environment and operating efficiently without issue. The general use case is a scheduled daily run that generates the difference report and executes the code Ad Hoc for the implementation mode. 

Are you looking to add this dbt Cloud jobs solution to your modern data stack? phData can help! With a prescriptive approach to version control core projects, our team can implement solutions with minimal downtime and help educate developers about dbt Cloud to keep them knowledgeable, efficient, and effective.

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit