AWS EMR Serverless Spark
Amazon EMR (Elastic MapReduce) Serverless is a deployment option for Amazon EMR that provides a serverless runtime environment. This simplifies the operation of analytics applications that use the latest open-source frameworks, such as Apache Spark and Apache Hive. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.
Bruin supports EMR Serverless as a data platform. You can use Bruin to integrate your Spark workloads into complex pipelines that use different data technologies, all without leaving your terminal.
Connection
In order to use bruin to run spark jobs in EMR Serverless, you need to define an emr_serverless
connection in your .bruin.yml
file. Here's a sample .bruin.yml
with the required fields defined.
environments:
default:
connections:
emr_serverless:
# name of your connection
- name: emr_serverless-default
# AWS credentials
access_key: AWS_ACCESS_KEY_ID
secret_key: AWS_SECRET_ACCESS_KEY
region: eu-north-1
# name of your EMR application
application_id: EMR_APPLICATION_ID
# role assumed by your job. This determines
# what AWS resources your spark job can access.
execution_role: IAM_ROLE_ARN
# (Python assets only)
# declares working area used by pyspark jobs.
workspace: s3://your-bucket/optional-prefix/
Logs
Bruin supports log streaming for spark jobs. This is only supported for spark logs stored in S3. Both DRIVER
and EXECUTOR
logs are streamed by default.
In order to stream logs, one of the following conditions must be met:
- Your EMR Serverless Application is pre-configured with an S3 Log Storage Location.
parameters.logs
must be defined
NOTE
Python assets stream logs out-of-the-box. You don't need to specify parameters.logs
for them.
EMR Serverless Assets
Bruin supports two different ways of defining a Spark asset:
- what we call a "managed" PySpark asset where Bruin takes care of delivering the code to the cluster as well
- as an external asset defined with YAML where Bruin simply orchestrates
emr_serverless.pyspark
A fully managed option where Bruin takes care of job setup, configuration, and execution. You only need to define the workload logic.
- Supports only PySpark scripts.
- Handles artifact deployment.
- Automatic log configuration.
- Concurrent-safe by default.
- Bundles internal dependencies and configures your job to use them.
Example: Standalone script
""" @bruin
name: pyspark_job
type: emr_serverless.pyspark
connection: app_staging
@bruin """
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark_session = SparkSession.builder.appName("jobName").getOrCreate()
run_workload(spark_session)
spark_session.stop()
def run_workload(session):
"""
crunch some numbers
"""
...
This defines a pyspark asset that will be executed by the EMR Serverless Application defined by the connection named app_staging
.
The run_workload
function is there for demonstration. You can structure your pyspark scripts however you like.
Example: Multi-module script
Advanced Spark users often package core logic into reusable libraries to improve consistency, reduce duplication, and streamline development across jobs. This approach ensures that shared transformations, validations, and business rules are implemented once and reused reliably.
Bruin has seamless support for pyspark modules.
For this example, let's assume this is how your bruin pipeline is structured:
acme_pipeline/
├── assets
│ └── main.py
├── lib
│ └── core.py
└── pipeline.yml
Let's say that acme_pipeline/lib/core.py
stores some common routines used throughout your jobs. For this example, we'll create a function called sanitize
that takes in a Spark DataFrame and sanitize it's columns (A common operation in Data Analytics).
from pyspark.sql import DataFrame
def sanitize(df: DateFrame):
"""
sanitize a dataframe
"""
...
You can now import this package in your PySpark assets.
""" @bruin
name: raw.transaction
type: emr_serverless.pyspark
connection: app_staging
@bruin """
from acme_pipeline.lib.core import sanitize
from pyspark.sql import SparkSession
if __name__ == "__main__":
session = SparkSession.builder.appName("raw.transaction_std").getOrCreate()
src = session.sparkContext.textFile("s3://acme/data/transactions").toDF()
sanitize(src)
session.stop()
Bruin internally sets the PYTHONPATH
to the root of your pipeline. So you'll always have to use the fully qualified package name to import any internal packages.
Workspace
Python assets require workspace
to be configured in your emr_serverless
connection. Workspace is a S3 path that is used by bruin as working storage for jobs that run on emr_serverless
.
Bruin uses this for:
- Storing Logs
- Staging your entrypoint file
- Uploading bundled dependencies.
emr_serverless.spark
A lightweight option that only supports triggering a job.
- Supports both PySpark scripts and JARs.
- Users are responsible for:
- deploying their artifacts
- managing internal dependencies
Choose the format that best fits your use case—use YAML when you want to integrate with pre-existing infrastructure, or use Python for a streamlined, fully-managed experience.
Example
name: spark_example_job
type: emr_serverless.spark
connection: app_staging
parameters:
entrypoint: s3://amzn-test-bucket/src/script.py
config: --conf spark.executor.cores=1
This defines an asset that runs a spark job on an EMR Serverless Application defined by the connection named app_staging
. The script at s3://amzn-test-bucket/src/script.py
is configured as the entrypoint of the job.
NOTE
YAML and Python assets require different type
parameter.
- YAML-style assets:
emr_serverless.spark
- Python assets:
emr_serverless.pyspark
.
Quality Checks
Quality checks for EMR Serverless are powered via AWS Athena.
WARNING
Bruin currently requires a few extra steps in order to be able to run quality checks for your EMR Serverless Assets. Future versions of Bruin will automate this process for you.
Prerequisites
- Configure an athena connection in your
bruin.yml
. - Set
parameters.athena_connection
to the name of your Athena connection. - Create an
athena
table on top of your data with the same name as your Asset's name.
Example
To demonstrate quality checks, we're going to write a simple pyspark script that writes static data as CSV to a bucket in S3.
Initial Setup
We're going to start by creating a pipeline called quality-checks-example
. We first run bruin init
to create the skeleton structure.
bruin init default quality-checks-example
Now we'll add the pyspark asset.
""" @bruin
name: users
type: emr_serverless.pyspark
@bruin """
from pyspark.sql import SparkSession
SCHEMA = ["id", "name", "age"]
USERS = [
(1, "Alice", 29),
(2, "Bob", 31),
(3, "Cathy", 25),
(4, "David", 35),
(5, "Eva", 28),
]
if __name__ == "__main__":
spark_session = SparkSession.builder.appName("users").getOrCreate()
df = spark.createDataFrame(USERS, SCHEMA)
df.write.csv("s3://acme/user/list", mode="overwrite")
spark.stop()
Next let's setup the bruin.yml
file with the credentials necessary to run our job.
environments:
default:
connections:
emr_serverless:
- name: emr_serverless-default
access_key: AWS_ACCESS_KEY_ID
secret_key: AWS_SECRET_ACCESS_KEY
region: eu-north-1
application_id: EMR_APPLICATION_ID
execution_role: IAM_ROLE_ARN
workspace: s3://acme/bruin-pyspark-workspace/
We can now run the pipeline to verify that it works
bruin run ./quality-checks-example
Output
Analyzed the pipeline 'quality-checks-example' with 1 assets. Pipeline: quality-checks-example No issues found ✓ Successfully validated 1 assets across 1 pipeline, all good. Starting the pipeline execution... [2025-05-11 18:10:00] Running: users [2025-05-11 18:10:00] ... output omitted for brevity ... [2025-05-11 18:10:05] Finished: users Executed 1 tasks in 5s
Enabling quality checks
In order to run quality checks, we need to:
- Create an Athena table in our AWS account.
- Configure an
athena
connection in ourbruin.yml
file.
To start, let's first create our Athena table. Go to your AWS Athena console and run the following DDL Query to create a table over the data our pyspark job created.
CREATE EXTERNAL TABLE users (id int, name string, age int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 's3://acme/user/list'
TIP
For more information on creating Athena tables, see Create tables in Athena and Use SerDe in the AWS Athena Documentation.
Next, update your bruin.yml
file with an athena connection.
environments:
default:
connections:
emr_serverless:
- name: emr_serverless-default
access_key: AWS_ACCESS_KEY_ID
secret_key: AWS_SECRET_ACCESS_KEY
region: eu-north-1
application_id: EMR_APPLICATION_ID
execution_role: IAM_ROLE_ARN
workspace: s3://acme/bruin-pyspark-workspace/
athena:
- name: quality-tests
access_key_id: AWS_ACCESS_KEY_ID
secret_access_key: AWS_SECRET_ACCESS_KEY
region: eu-north-1
query_results_path: "s3://acme/athena-output/"
Now we can update our assets to define some quality checks. For this example, we're going to add one column and one custom check.
""" @bruin
name: users
type: emr_serverless.pyspark
columns:
- name: id
type: integer
checks:
- non_negative
custom_checks:
- name: users are adults
query: SELECT count(*) from users where age < 18
value: 0
parameters:
athena_connection: quality-tests
@bruin """
from pyspark.sql import SparkSession
SCHEMA = ["id", "name", "age"]
USERS = [
(1, "Alice", 29),
(2, "Bob", 31),
(3, "Cathy", 25),
(4, "David", 35),
(5, "Eva", 28),
]
if __name__ == "__main__":
spark_session = SparkSession.builder.appName("users").getOrCreate()
df = spark.createDataFrame(USERS, SCHEMA)
df.write.csv("s3://acme/user/list", mode="overwrite")
spark.stop()
TIP
If all your Assets share the same type
and parameters.athena_connection
, you can set them as defaults in your pipeline.yml
to avoid repeating them for each Asset.
name: my-pipeline
default:
type: emr_serverless.pyspark
parameters:
athena_connection: quality-checks
Now when we run our bruin pipeline again, our quality checks should run after our Asset run finishes.
bruin run ./quality-checks-example
Output
Analyzed the pipeline 'quality-checks-example' with 1 assets. Pipeline: quality-checks-example No issues found ✓ Successfully validated 1 assets across 1 pipeline, all good. Starting the pipeline execution... [2025-05-11 18:20:36] Running: users [2025-05-11 18:20:36] ... output omitted for brevity ... [2025-05-11 18:21:01] Finished: users [2025-05-11 18:21:02] Running: users:id:non_negative [2025-05-11 18:21:02] Running: users:custom-check:users_are_adults [2025-05-11 18:21:06] Finished: users:id:non_negative [2025-05-11 18:21:06] Finished: users:custom-check:users_are_adults Executed 3 tasks in 30s
Asset Schema
Here's the full schema of the emr_serverless.spark
asset along with a brief explanation:
# required
name: spark_submit_test
# required, should be one of
# - emr_serverless.spark (yaml)
# - emr_serverless.pyspark (python)
type: emr_serverless.spark
# optional, defaults to emr_serverless-default
connection: connection-name-example
# job specific configuration
parameters:
# path of the pyspark script or jar to run (required) [yaml only]
entrypoint: s3://amzn-test-bucket/src/script.py
# path where logs are stored or should be stored (optional)
logs: s3://amzn-test-bucket/logs
# name of your athena connection (optional, defaults to "athena-default")
# used for quality checks
athena_connection: athena-conn
# args to pass to the entrypoint (optional)
args: arg1 arg2
# spark configuration (optional)
config: --conf spark.executor.cores=1
# timeout for the job, defaults to 0 which means no time limit (optional)
timeout: 10m
For more details on EMR Serverless applications, see AWS Documentation