Run Spark Applications on AWS Fargate

February 18, 2021
[Spark] [Fargate] [AWS] [Docker]

Too many cooks spoil the omelette

Overview

In this post, we are going to review how to run a Spark application, as a single node Fargate task. If you are familiar with Spark and its potential workload, you might wonder: “why would anyone need to run Spark applications over AWS Fargate, instead of a proper cluster”.

Well, to be honest, that's a valid question, however, there is a profusion of deployment cases where the application doesn't really need a full-blown cluster. Two immediate examples that come to mind are:

  • Running functional tests on an application.
  • Running a streaming job to download data once a day.Check this blog post by Databricks as a testament of the benefits of such a process.

In both of these scenarios, it's quite normal that the data involved are not particularly big in terms of size, and using a cluster, maybe more like an overkill.

Table of Contents

Why Fargate?

At this point, you might even ask: why Fargate? For me, it's simply because it doesn't require me to decide about cluster size, instance types, and resource management. 😉

However, there are two considerations we need to keep in mind here:

  1. Fargate, like any other AWS service, can turn into a budget black hole. It's always wise to decide about the platform, keeping this simple fact in mind and choose around the usage type.
  2. The solution we are reviewing here is based on a Docker container. So the overall concept would be possible to run on any platform that runs Docker.

We need Spark 3 + Hadoop 3!

Running a Spark app inside a container, with proper access management for AWS wasn't as easy as we are going to review here. With Hadoop 2.7 (packaged with Spark versions prior to version 3), the bundled AWS SDK library, was version 1.7.4 (released back in 2016), and couldn't properly access S3 credentials from the ECS task execution role.

Fortunately, with the release of Spark 3, this issue is resolved, and it's possible for the task containers to utilize the task's IAM roles.

Create the Container

Without further ado, let's start. First, we need to create a Docker image, with all the necessary tools to run a Spark application. If you can't wait and want to try it right away, you can head over to DataChef's SparGate project, which is the deployed version of the image we are going to discuss here.

For this project, we are going to use Alpine Linux as the base image, and we need to take care of some small details to make it usable to run our application.

Install Dependencies

We have 3 basic dependency that needs to be installed on the image:

RUN apk add openjdk8-jre bash snappy

The odd thing for me here was the requirement of bash, which is required by spark-shell. The other two, however, are ordinary dependencies we already know.

Configure Snappy

By now, we have everything we need for the Spark application to run. However, as soon as we try to write a parquet file with snappy compression, we will get the following error:

Caused by: java.lang.UnsatisfiedLinkError: /tmp/snappy-1.1.7-4479af19-a22c-461a-8e67-e526dea3a9d8-libsnappyjava.so: Error loading shared library ld-linux-x86-64.so.2: No such file or directory (needed by /tmp/snappy-1.1.7-4479af19-a22c-461a-8e67-e526dea3a9d8-libsnappyjava.so)

We can resolve it by adding the following lines to the Docker file:

# install libc6-compat and link it
RUN apk add libc6-compat
RUN ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2

Install Spark

Well, this part is quite straightforward, we just need to download a pre-built version and decompress it into the proper path:

# Install spark
ADD ${SPARK_PACKAGE_URL} /tmp/
RUN tar xvf /tmp/$SPARK_PACKAGE -C /opt
RUN ln -vs /opt/spark* /opt/spark

Install AWS Dependencies

In order to make Spark able to communicate with S3, we need to install hadoop-aws together with aws-java-sdk-bundle. Here is how we do it:

# download required aws jars into class path

ADD $HADOOP_JAR  /opt/spark/jars/
ADD $AWS_SDK_JAR /opt/spark/jars/

Spark Configuration

There is one more step for us to wrap up the Dockerfile and that's instructing Spark on where to find the required S3 credentials. For that, we create a file called spark-defaults.conf with the following content:

spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper

As you know this file can be also used to provide other configurations in the future. Now we need to configure our Spark instance to use this configuration:

COPY spark-defaults.conf /opt/spark/conf/spark-defaults.conf

Complete Docker Spec

Here you can see how the final Dockerfile looks like:

# Dockerfile

FROM alpine:3.12

ARG HADOOP_VERSION=3.2.0
ARG HADOOP_VERSION_SHORT=3.2
ARG SPARK_VERSION=3.0.1

ARG AWS_SDK_VERSION=1.11.375
ARG SPARK_PACKAGE=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION_SHORT}.tgz
ARG SPARK_PACKAGE_URL=https://downloads.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_PACKAGE}
ARG HADOOP_JAR=https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
ARG AWS_SDK_JAR=https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar

# Install dependencies

RUN apk update
RUN apk add openjdk8-jre bash snappy

# Install spark

ADD ${SPARK_PACKAGE_URL} /tmp/
RUN tar xvf /tmp/$SPARK_PACKAGE -C /opt
RUN ln -vs /opt/spark* /opt/spark
COPY spark-defaults.conf /opt/spark/conf/spark-defaults.conf

ADD $HADOOP_JAR  /opt/spark/jars/
ADD $AWS_SDK_JAR /opt/spark/jars/

# Fix snappy library load issue
RUN apk add libc6-compat
RUN ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2

# Cleanup
RUN rm -rfv /tmp/*

# Add Spark tools to path
ENV PATH="/opt/spark/bin/:${PATH}"
CMD ["spark-shell"]

Publish the Image

You can publish this image into any container registry of your choice. Usually, it's better to deploy it into ECR, which allows you to run your container without a public IP address. However, since I wanted to share a ready-to-use version of the image with the readers, I’m going to use Docker hub to keep things simple.

The Application

The following Spark application is going to simply read in a JSON file, add a date partition to it and finally store the result as a parquet file on the destination path. We are going to use S3 to store the application’s jar file, together with input and output data files 😎:


package simple

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._

object SimpleSpark {
  private val parquetFmt = "parquet"
  protected lazy val spark: SparkSession =
    SparkSession.builder
      .enableHiveSupport()
      .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false")
      .config("spark.hadoop.fs.s3a.fast.upload", "true")
      .config(new SparkConf)
      .getOrCreate()

  object Columns {
    val date = "date"
  }

  def main(args: Array[String]): Unit = {
    val arguments = Arguments(args)
    val df = spark.read
      .option("multiline", true)
      .json(arguments.sourcePath)
      .withColumn(Columns.date, current_date)

    df.repartition(col(Columns.date))
      .write
      .mode(SaveMode.Overwrite)
      .partitionBy(Columns.date)
      .format(parquetFmt)
      .option("path", arguments.destPath)
      .save()
  }
}

You can find the full version of the source code for this application, over DataChef's Github.

Deployment

ECS Cluster

I'm going to use an ECS cluster for this blog post (the other option to run Fargate is EKS, and yeah, it's called “cluster”, but it's way different from what we usually mean by when we use the word “clusters” 😊).

$ aws ecs create-cluster SparGate

I'm also going to create a log group, into which will I’ll point our task's log stream:

$ aws logs create-log-group --log-group-name SimpleSpark

ECS Task

Now it's time to create the actual task and attach it to our cluster and log stream:

$ aws ecs register-task-definition --family SimpleSpark \\
    --cpu 512 --memory 2048 \\
    --requires-compatibilities FARGATE \\                       # 1
    --network-mode awsvpc \\                                    # 2
    --task-role-arn arn:aws:iam::xxx:role/task-spargate \\      # 3
    --execution-role-arn arn:aws:iam::xxx:role/exec-spargate \\ # 4
    --container-definitions '[{
  "image": "datachefhq/spargate:latest",                       # 5
  "logConfiguration": {
    "logDriver": "awslogs",
    "options": { 
      "awslogs-group" : "SimpleSpark",                         # 6
      "awslogs-region": "eu-west-1",
      "awslogs-stream-prefix": "ecs"
    }
  },
  "name": "SimpleSpark"
}]'

Let's take a step back and see where we're at:

  1. Obviously, the first thing we configured here is for the task to run on Fargate, instead of an EC2 instance fleet. This means only tasks with FARGATE launch type would be able to run on this cluster.
  2. Then we configured the network mode to aws-vpc which is the only mode that Fargate tasks can run on.
  3. Then we assigned a task role, which provides the task's inner job with the required permissions. In this case, the role provides read/write access to a specific S3 bucket which is all I need.
  4. We also provided an execution role, which is required for the task to support AWS logs as the log driver. If you want to know how to create this role, you can find the full instruction over here.
  5. After that, we defined a single container for our task, with an image pointing to our Spark Image.
  6. Finally, we pass the log group name to the image to use it.

Run the Task

At this point, we have everything we need to trigger the task. However, I intentionally made a generic Image. The main entry point of the container runs spark-shell. When we want to use it to run a Spark application, we need to override this default entry point. Here is the command:

$ aws ecs run-task \\
    --cluster SparGate \\                                               # 1
    --launch-type FARGATE \\                                            # 2
    --task-definition SimpleSpark:1 \\                                  # 3
    --network-configuration '{
      "awsvpcConfiguration": {
        "subnets": ["subnet-xxxxx"],                                   # 4 
        "securityGroups": ["sg-xxxxx"],                
        "assignPublicIp": "ENABLED"                                    # 5
      }
    }' \\
    --overrides='{
      "containerOverrides": [
        {"name": "SimpleSpark",                                        # 6
         "command": [                                                 
           "spark-submit", 
           "s3a://xxxx/simple-spark-assembly-0.1.0.jar",
           "--source-path", "s3a://xxxx/source/sample_data.json",
           "--dest-path", "s3a://xxxx/destination/"]}
      ]
    }'
  1. We define the target cluster for the task.
  2. Set the launch type to FARGATE which is the only acceptable type by our cluster.
  3. Provide the task definition. Note to the version number attached to the task name. Every time you deploy a new task with the same name (running the ECS register-task-definition command), it'll create a new version.
  4. A subnet and a security group on the target VPC to run the task on. Since in this blog post we plan to use S3, it’s important to make sure an S3 Endpoint is attached to the VPC. Otherwise, we can’t read or write objects on S3.
  5. Enabling public IP assignment is required since the Docker image needs to be fetched from outside of the AWS environment (e.g Docker hub). However, if you are using ECR with access over your VPC, you can set this to DISABLED.
  6. I override the container command and as you see I’m passing the application's jar, together with source/dest paths over my S3 bucket.

And that's all needed. After a couple of seconds, you can see your task marked as STOPPED on the ECS cluster and you can read the logs for the status check.

Sources

comments powered by Disqus
Made with + in Amsterdam