Skip to content

AWS EMR Serverless Spark

Amazon EMR (Elastic MapReduce) Serverless is a deployment option for Amazon EMR that provides a serverless runtime environment. This simplifies the operation of analytics applications that use the latest open-source frameworks, such as Apache Spark and Apache Hive. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.

Bruin supports EMR Serverless as a data platform. You can use Bruin to integrate your Spark workloads into complex pipelines that use different data technologies, all without leaving your terminal.

Connection

In order to use bruin to run spark jobs in EMR Serverless, you need to define an emr_serverless connection in your .bruin.yml file. Here's a sample .bruin.yml with the required fields defined.

yaml
environments:
  default:
    connections:
      emr_serverless:

        # name of your connection
      - name: emr_serverless-default

        # AWS credentials
        access_key: AWS_ACCESS_KEY_ID
        secret_key: AWS_SECRET_ACCESS_KEY
        region: eu-north-1

        # name of your EMR application
        application_id: EMR_APPLICATION_ID

        # role assumed by your job. This determines
        # what AWS resources your spark job can access.
        execution_role: IAM_ROLE_ARN

        # (Python assets only)
        # declares working area used by pyspark jobs.
        workspace: s3://your-bucket/optional-prefix/

Logs

Bruin supports log streaming for spark jobs. This is only supported for spark logs stored in S3. Both DRIVER and EXECUTOR logs are streamed by default.

In order to stream logs, one of the following conditions must be met:

  • Your EMR Serverless Application is pre-configured with an S3 Log Storage Location.
  • parameters.logs must be defined

NOTE

Python assets stream logs out-of-the-box. You don't need to specify parameters.logs for them.

EMR Serverless Assets

Bruin supports two different ways of defining a Spark asset:

  • what we call a "managed" PySpark asset where Bruin takes care of delivering the code to the cluster as well
  • as an external asset defined with YAML where Bruin simply orchestrates

emr_serverless.pyspark

A fully managed option where Bruin takes care of job setup, configuration, and execution. You only need to define the workload logic.

  • Supports only PySpark scripts.
  • Handles artifact deployment.
  • Automatic log configuration.
  • Concurrent-safe by default.
  • Bundles internal dependencies and configures your job to use them.

Example: Standalone script

bruin-python
""" @bruin
name: pyspark_job
type: emr_serverless.pyspark
connection: app_staging
@bruin """

from pyspark.sql import SparkSession

if __name__ == "__main__":
  spark_session = SparkSession.builder.appName("jobName").getOrCreate()
  run_workload(spark_session)
  spark_session.stop()

def run_workload(session):
  """
    crunch some numbers
  """
  ...

This defines a pyspark asset that will be executed by the EMR Serverless Application defined by the connection named app_staging.

The run_workload function is there for demonstration. You can structure your pyspark scripts however you like.

Example: Multi-module script

Advanced Spark users often package core logic into reusable libraries to improve consistency, reduce duplication, and streamline development across jobs. This approach ensures that shared transformations, validations, and business rules are implemented once and reused reliably.

Bruin has seamless support for pyspark modules.

For this example, let's assume this is how your bruin pipeline is structured:

acme_pipeline/
├── assets
│   └── main.py
├── lib
│   └── core.py
└── pipeline.yml

Let's say that acme_pipeline/lib/core.py stores some common routines used throughout your jobs. For this example, we'll create a function called sanitize that takes in a Spark DataFrame and sanitize it's columns (A common operation in Data Analytics).

python
from pyspark.sql import DataFrame

def sanitize(df: DateFrame):
  """
  sanitize a dataframe
  """
  ...

You can now import this package in your PySpark assets.

bruin-python
""" @bruin
name: raw.transaction
type: emr_serverless.pyspark
connection: app_staging
@bruin """

from acme_pipeline.lib.core import sanitize
from pyspark.sql import SparkSession

if __name__ == "__main__":
  session = SparkSession.builder.appName("raw.transaction_std").getOrCreate()
  src = session.sparkContext.textFile("s3://acme/data/transactions").toDF()
  sanitize(src)
  session.stop()

Bruin internally sets the PYTHONPATH to the root of your pipeline. So you'll always have to use the fully qualified package name to import any internal packages.

Workspace

Python assets require workspace to be configured in your emr_serverless connection. Workspace is a S3 path that is used by bruin as working storage for jobs that run on emr_serverless.

Bruin uses this for:

  • Storing Logs
  • Staging your entrypoint file
  • Uploading bundled dependencies.

workspace diagram

emr_serverless.spark

A lightweight option that only supports triggering a job.

  • Supports both PySpark scripts and JARs.
  • Users are responsible for:
    • deploying their artifacts
    • managing internal dependencies

Choose the format that best fits your use case—use YAML when you want to integrate with pre-existing infrastructure, or use Python for a streamlined, fully-managed experience.

Example

yaml
name: spark_example_job
type: emr_serverless.spark
connection: app_staging
parameters:
  entrypoint: s3://amzn-test-bucket/src/script.py
  config: --conf spark.executor.cores=1

This defines an asset that runs a spark job on an EMR Serverless Application defined by the connection named app_staging. The script at s3://amzn-test-bucket/src/script.py is configured as the entrypoint of the job.

NOTE

YAML and Python assets require different type parameter.

  • YAML-style assets: emr_serverless.spark
  • Python assets: emr_serverless.pyspark.

Quality Checks

Quality checks for EMR Serverless are powered via AWS Athena.

WARNING

Bruin currently requires a few extra steps in order to be able to run quality checks for your EMR Serverless Assets. Future versions of Bruin will automate this process for you.

Prerequisites

  • Configure an athena connection in your bruin.yml.
  • Set parameters.athena_connection to the name of your Athena connection.
  • Create an athena table on top of your data with the same name as your Asset's name.

Example

To demonstrate quality checks, we're going to write a simple pyspark script that writes static data as CSV to a bucket in S3.

Initial Setup

We're going to start by creating a pipeline called quality-checks-example. We first run bruin init to create the skeleton structure.

sh
bruin init default quality-checks-example

Now we'll add the pyspark asset.

bruin-python
""" @bruin
name: users
type: emr_serverless.pyspark
@bruin """

from pyspark.sql import SparkSession

SCHEMA = ["id", "name", "age"]
USERS = [
  (1, "Alice", 29),
  (2, "Bob", 31),
  (3, "Cathy", 25),
  (4, "David", 35),
  (5, "Eva", 28),
]

if __name__ == "__main__":
  spark_session = SparkSession.builder.appName("users").getOrCreate()
  df = spark.createDataFrame(USERS, SCHEMA)
  df.write.csv("s3://acme/user/list", mode="overwrite")
  spark.stop()

Next let's setup the bruin.yml file with the credentials necessary to run our job.

yaml
environments:
  default:
    connections:
      emr_serverless:
      - name: emr_serverless-default
        access_key: AWS_ACCESS_KEY_ID
        secret_key: AWS_SECRET_ACCESS_KEY
        region: eu-north-1
        application_id: EMR_APPLICATION_ID
        execution_role: IAM_ROLE_ARN
        workspace: s3://acme/bruin-pyspark-workspace/

We can now run the pipeline to verify that it works

sh
bruin run ./quality-checks-example

Output

Analyzed the pipeline 'quality-checks-example' with 1 assets.

Pipeline: quality-checks-example
  No issues found

✓ Successfully validated 1 assets across 1 pipeline, all good.

Starting the pipeline execution...
[2025-05-11 18:10:00] Running:  users
[2025-05-11 18:10:00] ... output omitted for brevity ...
[2025-05-11 18:10:05] Finished: users

Executed 1 tasks in 5s

Enabling quality checks

In order to run quality checks, we need to:

  1. Create an Athena table in our AWS account.
  2. Configure an athena connection in our bruin.yml file.

To start, let's first create our Athena table. Go to your AWS Athena console and run the following DDL Query to create a table over the data our pyspark job created.

sql
CREATE EXTERNAL TABLE users (id int, name string, age int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 's3://acme/user/list'

TIP

For more information on creating Athena tables, see Create tables in Athena and Use SerDe in the AWS Athena Documentation.

Next, update your bruin.yml file with an athena connection.

yaml
environments:
  default:
    connections:
      emr_serverless:
      - name: emr_serverless-default
        access_key: AWS_ACCESS_KEY_ID
        secret_key: AWS_SECRET_ACCESS_KEY
        region: eu-north-1
        application_id: EMR_APPLICATION_ID
        execution_role: IAM_ROLE_ARN
        workspace: s3://acme/bruin-pyspark-workspace/
      athena:                                           
      - name: quality-tests
        access_key_id: AWS_ACCESS_KEY_ID
        secret_access_key: AWS_SECRET_ACCESS_KEY
        region: eu-north-1
        query_results_path: "s3://acme/athena-output/"

Now we can update our assets to define some quality checks. For this example, we're going to add one column and one custom check.

bruin-python
""" @bruin
name: users
type: emr_serverless.pyspark
columns:              
  - name: id
    type: integer
    checks:           
      - non_negative
custom_checks: 
  - name: users are adults
    query: SELECT count(*) from users where age < 18
    value: 0
parameters: 
  athena_connection: quality-tests
@bruin """

from pyspark.sql import SparkSession

SCHEMA = ["id", "name", "age"]
USERS = [
  (1, "Alice", 29),
  (2, "Bob", 31),
  (3, "Cathy", 25),
  (4, "David", 35),
  (5, "Eva", 28),
]

if __name__ == "__main__":
  spark_session = SparkSession.builder.appName("users").getOrCreate()
  df = spark.createDataFrame(USERS, SCHEMA)
  df.write.csv("s3://acme/user/list", mode="overwrite")
  spark.stop()

TIP

If all your Assets share the same type and parameters.athena_connection, you can set them as defaults in your pipeline.yml to avoid repeating them for each Asset.

yaml
name: my-pipeline
default:
  type: emr_serverless.pyspark
  parameters:
    athena_connection: quality-checks

Now when we run our bruin pipeline again, our quality checks should run after our Asset run finishes.

sh
bruin run ./quality-checks-example

Output

Analyzed the pipeline 'quality-checks-example' with 1 assets.

Pipeline: quality-checks-example
  No issues found

✓ Successfully validated 1 assets across 1 pipeline, all good.

Starting the pipeline execution...
[2025-05-11 18:20:36] Running:  users
[2025-05-11 18:20:36] ... output omitted for brevity ...
[2025-05-11 18:21:01] Finished: users
[2025-05-11 18:21:02] Running:  users:id:non_negative
[2025-05-11 18:21:02] Running:  users:custom-check:users_are_adults
[2025-05-11 18:21:06] Finished: users:id:non_negative
[2025-05-11 18:21:06] Finished: users:custom-check:users_are_adults

Executed 3 tasks in 30s

Asset Schema

Here's the full schema of the emr_serverless.spark asset along with a brief explanation:

yaml
# required
name: spark_submit_test

# required, should be one of 
#   - emr_serverless.spark    (yaml)
#   - emr_serverless.pyspark  (python)
type: emr_serverless.spark 

# optional, defaults to emr_serverless-default
connection: connection-name-example  

# job specific configuration
parameters:

  # path of the pyspark script or jar to run (required) [yaml only]
  entrypoint: s3://amzn-test-bucket/src/script.py   

  # path where logs are stored or should be stored (optional)
  logs: s3://amzn-test-bucket/logs

  # name of your athena connection (optional, defaults to "athena-default")
  # used for quality checks
  athena_connection: athena-conn

  # args to pass to the entrypoint (optional)
  args: arg1 arg2

  # spark configuration (optional)
  config: --conf spark.executor.cores=1

  # timeout for the job, defaults to 0 which means no time limit (optional)
  timeout: 10m

For more details on EMR Serverless applications, see AWS Documentation