Skip to content

Python Assets

Bruin takes the Python data development experience to the next level:

  • Bruin runs assets in isolated environments: mix and match Python versions & dependencies
  • It installs & manages Python versions automatically, so you don't have to have anything installed
  • You can return dataframes and it uploads them to your destination
  • You can run quality checks on it just as a regular asset

Bruin uses the amazing uv under the hood to abstract away all the complexity.

Python assets are built to be as flexible as possible. You can use any Python package you want, as long as it is installable with pip.

bruin-python
"""@bruin
name: tier1.my_custom_api
image: python:3.13
connection: bigquery

materialization:
  type: table
  strategy: merge

columns:
  - name: col1
    type: integer
    checks:
      - name: unique
      - name: not_null
@bruin"""

import pandas as pd

def materialize():
    items = 100000
    df = pd.DataFrame({
        'col1': range(items),
        'col2': [f'value_new_{i}' for i in range(items)],
        'col3': [i * 6.0 for i in range(items)]
    })

    return df

Dependency Management

Bruin supports two ways of managing Python dependencies:

  1. pyproject.toml with uv.lock (recommended)
  2. requirements.txt (legacy)

Bruin searches for dependency files by walking up the directory tree from the asset's location to the repository root. If both requirements.txt and pyproject.toml exist in the same search path, requirements.txt takes priority for backward compatibility.

The recommended way to manage dependencies is with a standard pyproject.toml file. This gives you access to uv's full dependency resolution, including lockfile support via uv.lock.

text
my-pipeline/
    assets/
        fetch_data.py
    pyproject.toml
    uv.lock
    pipeline.yml

A minimal pyproject.toml looks like this:

toml
[project]
name = "my-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "pandas>=2.0.0",
    "requests>=2.28.0",
]

When Bruin finds a pyproject.toml, it runs uv run from the project directory, which automatically:

  • Installs dependencies defined in pyproject.toml
  • Uses uv.lock for reproducible builds if present
  • Syncs the project environment before running your asset

Locking dependencies

You can lock your dependencies to ensure reproducible builds across environments:

shell
bruin internal lock-asset-dependencies assets/fetch_data.py

When a pyproject.toml is detected, this runs uv lock in the project directory, generating or updating the uv.lock file. You should commit uv.lock to version control.

You can also run uv lock directly from the project directory:

shell
cd my-pipeline && uv lock

Using requirements.txt

You can also manage dependencies with requirements.txt files. Bruin searches for the closest requirements.txt file by walking up the directory tree from the asset's location.

For example, assume you have a file tree such as:

text
* folder1/
    * folder2/
        * test.py
        * requirements.txt
    * folder3/
        * test2.py
    * requirements.txt
* folder4/
    * folder5/
        * folder6/
            * test3.py
* requirements.txt
  • When Bruin runs test.py, it will use folder1/folder2/requirements.txt, since they are in the same folder.
  • For test2.py, since there is no requirements.txt in the same folder, Bruin goes up one level in the tree and finds folder1/requirements.txt.
  • Similarly, requirements.txt in the main folder is used for test3.py since none of folder6, folder5 and folder4 have any requirements.txt files.

Locking dependencies

You can lock requirements.txt dependencies using:

shell
bruin internal lock-asset-dependencies assets/test.py

This runs uv pip compile to resolve and pin all dependency versions in-place.

Resolution priority

When both requirements.txt and pyproject.toml exist in the search path, Bruin uses the following priority:

  1. requirements.txt — checked first for backward compatibility
  2. pyproject.toml — used if no requirements.txt is found
  3. No dependencies — if neither file is found, the asset runs without dependency installation

Python versions

Bruin supports various Python versions in the same pipeline, all running in isolated environments. The resolved dependencies will be installed correctly for the corresponding Python version without impacting each other.

You can define Python versions using the image key:

bruin-python
"""@bruin
name: tier1.my_custom_api
image: python:3.11
@bruin"""

print('hello world')

Secrets

Bruin supports injecting connections into your Python assets as environment variables.

You can define secrets in your asset definition using the secrets key, and Bruin will automatically make them available as environment variables during execution. The injected secret is a JSON representation of the connection model.

This is useful for API keys, passwords, and other sensitive information.

bruin-python
"""@bruin
name: tier1.my_custom_api
secrets:
    - key: connection_name
@bruin"""

import os

my_secret = os.environ["connection_name"]
# Use your secret in your code

By default, secrets are injected as environment variables using the key name. If you want to inject a secret under a different environment variable name, you can use the inject_as field:

bruin-python
"""@bruin
name: tier1.my_custom_api
secrets:
    - key: connection_name
      inject_as: creds
@bruin"""

import os

my_secret = os.environ["creds"] 
# Use your secret in your code

This allows you to map a secret key to any environment variable name you prefer inside your Python code.

Environment Variables

Bruin introduces a set of environment variables by default to every Python asset.

Builtin

The following environment variables are available in every Python asset execution:

Environment VariableDescription
BRUIN_START_DATEThe start date of the pipeline run in YYYY-MM-DD format (e.g. 2024-01-15)
BRUIN_START_DATETIMEThe start date and time of the pipeline run in YYYY-MM-DDThh:mm:ss format (e.g. 2024-01-15T13:45:30)
BRUIN_START_TIMESTAMPThe start timestamp of the pipeline run in RFC3339 format with timezone (e.g. 2024-01-15T13:45:30.000000Z07:00)
BRUIN_END_DATEThe end date of the pipeline run in YYYY-MM-DD format (e.g. 2024-01-15)
BRUIN_END_DATETIMEThe end date and time of the pipeline run in YYYY-MM-DDThh:mm:ss format (e.g. 2024-01-15T13:45:30)
BRUIN_END_TIMESTAMPThe end timestamp of the pipeline run in RFC3339 format with timezone (e.g. 2024-01-15T13:45:30.000000Z07:00)
BRUIN_EXECUTION_DATEThe execution date of the pipeline run in YYYY-MM-DD format (e.g. 2024-01-15)
BRUIN_EXECUTION_DATETIMEThe execution date and time of the pipeline run in YYYY-MM-DDThh:mm:ss format (e.g. 2024-01-15T13:45:30)
BRUIN_EXECUTION_TIMESTAMPThe execution timestamp of the pipeline run in RFC3339 format with timezone (e.g. 2024-01-15T13:45:30.000000Z07:00)
BRUIN_RUN_IDThe unique identifier for the pipeline run
BRUIN_PIPELINEThe name of the pipeline being executed
BRUIN_FULL_REFRESHSet to 1 when the pipeline is running with the --full-refresh flag, empty otherwise
BRUIN_THISThe name of the python asset

Pipeline

Bruin supports user-defined variables at a pipeline level. These become available as a JSON document in your python asset as BRUIN_VARS. When no variables exist, BRUIN_VARS is set to {}. See Variables for more information on how to define and override them, including the full list of JSON Schema type options and complementary keywords.

Here's a short example:

yaml
name: pipeline-with-variables
variables:
  target_segment:
    type: string
    enum: ["self_serve", "enterprise", "partner"]
    default: "enterprise"
  forecast_horizon_days:
    type: integer
    minimum: 7
    maximum: 90
    default: 30
  experiment_cohorts:
    type: array
    minItems: 1
    items:
      type: object
      required: [name, weight, channels]
      properties:
        name:
          type: string
        weight:
          type: number
        channels:
          type: array
          items:
            type: string
      additionalProperties: false
    default:
      - name: enterprise_baseline
        weight: 0.6
        channels: ["email", "customer_success"]
      - name: partner_campaign
        weight: 0.4
        channels: ["webinar", "email"]
bruin-python
""" @bruin
name: inspect_segments
@bruin """

import os
import json

vars = json.loads(os.environ.get("BRUIN_VARS"))

print("target_segment:", vars["target_segment"])            # target_segment: enterprise
print("forecast_horizon_days:", vars["forecast_horizon_days"])  # forecast_horizon_days: 30

for cohort in vars["experiment_cohorts"]:
    print(cohort["name"], cohort["weight"], cohort["channels"])
    # enterprise_baseline 0.6 ['email', 'customer_success']
    # partner_campaign 0.4 ['webinar', 'email']

TIP

You can override the value of variables at runtime using --var flag.

Materialization

Bruin runs regular Python scripts by default; however, quite often teams need to load data into a destination from their Python scripts. Bruin supports materializing the data returned by a Python script into a data warehouse.

The requirements to get this working is:

  • define a materialization config in the asset definition
  • have a function called materialize in your Python script that returns a pandas/polars dataframe or a list of dicts.

WARNING

This feature has been very recently introduced, and is not battle-tested yet. Please create an issue if you encounter any bugs.

bruin-python
"""@bruin
name: tier1.my_custom_api
image: python:3.13
connection: bigquery

materialization:
  type: table
  strategy: merge
 
columns:
    - name: col1
      primary_key: true
@bruin"""

import pandas as pd

def materialize(**kwargs):
    items = 100000
    df = pd.DataFrame({
        'col1': range(items),
        'col2': [f'value_new_{i}' for i in range(items)],
        'col3': [i * 6.0 for i in range(items)]
    })

    return df

Under the hood

Bruin uses Apache Arrow under the hood to keep the returned data efficiently, and uses ingestr to upload the data to the destination. The workflow goes like this:

  • install the asset dependencies using uv
  • run the materialize function of the asset
  • save the returned data into a temporary file using Arrow memory-mapped files
  • run ingestr to load the Arrow memory-mapped file into the destination
  • delete the memory-mapped file

This flow ensures that the typing information gathered from the dataframe will be preserved when loading to the destination, and it supports incremental loads, deduplication, and all the other features of ingestr.

Enforcing column types

By default, ingestr infers column types from the dataframe. If you want to enforce specific column types in the destination table, you can use the enforce_schema parameter along with column definitions:

bruin-python
"""@bruin
name: tier1.users_api
image: python:3.11
connection: bigquery

materialization:
  type: table
  strategy: merge

parameters:
  enforce_schema: true

columns:
  - name: id
    type: integer
    primary_key: true
  - name: name
    type: string
  - name: email
    type: string
  - name: created_at
    type: timestamp
@bruin"""

import pandas as pd

def materialize():
    # Fetch data from API
    return pd.DataFrame({
        'id': [1, 2, 3],
        'name': ['Alice', 'Bob', 'Charlie'],
        'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com'],
        'created_at': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03'])
    })

When enforce_schema: true is set, Bruin passes the column type hints to ingestr, ensuring the destination table schema matches your definition rather than relying on type inference.

Column-level lineage

Bruin supports column-level lineage for Python assets as well as SQL assets. In order to get column-level lineage, you need to annotate the columns that are exposed by the Bruin asset.

bruin-python
""" @bruin
name: myschema.my_mat_asset 
materialization:
  type: table
  strategy: merge

columns:
    - name: col1
      type: int
      upstreams:       
        - table: xyz
          column: col1

@bruin """

import pandas as pd

def materialize():
    items = 100000
    df = pd.DataFrame({
        'col1': range(items),
        'col2': [f'value_new_{i}' for i in range(items)],
        'col3': [i * 6.0 for i in range(items)]
    })

    return df

Bruin will use the annotations to build the column-lineage dependency across all of your assets, including those extracted from SQL automatically.

Examples

bruin-python
""" @bruin
name: hello_world
@bruin """

print("Hello World!")

Ingest data to BigQuery via an API manually

bruin-python
""" @bruin
name: raw_data.currency_rates
type: python
parameters:
    loader_file_format: jsonl
secrets:
    - key: bigquery_conn
@bruin """

import os
import currency_rates
import pandas as pd
import json
from google.cloud import bigquery

# Bruin injects secrets as a JSON string.
# This function takes a connection name and returns a BigQuery client
def get_bq_client(conn_name: str) -> bigquery.Client:
    serv_acc = json.loads(os.environ[conn_name])
    return bigquery.Client.from_service_account_info(
        json.loads(serv_acc["service_account_json"]), 
        project=serv_acc["project_id"]
    )

START_DATE = os.environ["BRUIN_START_DATE"]
END_DATE = os.environ["BRUIN_END_DATE"]

bq_client = get_bq_client("bigquery_conn")
df = currency_rates.get_rates(start=START_DATE, end=END_DATE)

df.to_gbq("raw_data.currency_rates", if_exists="replace", credentials=bq_client._credentials)