Techno Blender
Digitally Yours.

Getting started with Delta Lake & Spark in AWS— The Easy Way | by Irfan Elahi | Aug, 2022

0 58


A step-by-step tutorial to configure Apache Spark and Delta Lake on EC2 in AWS along with code examples in Python

Photo by Joshua Sortino on Unsplash

If you have worked on engineering a datalake or lake-house solutions, chances are that you may have employed (or have heard of) de-coupled and distributed computation frameworks against scalable storage layer of your datalake platform. Though the list of such computation frameworks is growing, but Apache Spark has continued to evolve and prove its robustness in the big data processing landscape quite consistently. A number of vendors are offering varying flavors (e.g. managed, serverless, containerized ) of Apache Spark based solutions (e.g. Databricks, Amazon EMR) in accordance with the growing success of Apache Spark as well. Additionally, there has been a surge in addressing ACID limitations on existing data-lake solution and you may have heard of solutions like Delta Lake, Hudi, Iceberg in this context. All of this seems fascinating but it can be a bit overwhelming for beginners at times. There is a possibility that you may want to start small to embrace the potential of Apache Spark on an ACID compliant data-lake (e.g. by having it properly configured in a VM (EC2) in AWS). Or you may want to address specific use-cases which don’t require vendor-based offerings or don’t require a fleet of instances for massive distributed processing. Some examples of such specific use-cases could be:

  • You find that your existing ETL processes don’t have SQL like processing capabilities. e.g. assuming that you are using Python, you are relying mostly on Python native data structures like lists, dictionaries and its modules likes csv, pandas to achieve desired transformation. You believe that having the capability to run SQL expressions interchangeable with more abstract and scalable data structures like dataframes to achieve your desired transformation can accelerate and simplify your development e.g. instead of resorting to multiple lines of codes to read data from landing directory and split it into multiple target directories based on value of column; you can achieve it conveniently with a couple of lines of code in Spark SQL.
  • You find S3 Select to be pretty limiting and want a better solution to run SQL on S3 in addition to Athena.
  • If the velocity of your data is near-real-time and at any point in time, the amount of data being processed can fit within the resources i.e. CPU and RAM of a VM and thus it doesn’t require distributed computation.
  • You want to prototype Apache Spark and Delta Lake based solution to substantiate its business value by starting small i.e. by running it in an EC2 instance of your choice. You don’t want to run a full-blown vendor based solution for your initial prototype. Same heuristics apply if you want to learn these new technologies.

If any one of the above clauses resonate with you, then you will probably find this article quite helpful as it explains in simple/easy way how to get started with Apache Spark and Delta Lake on an EC2 instance in AWS.

Pre-Requisites

To follow along, you will require the following:

  • An AWS Account
  • EC2 instance (can be of any size but would suggest at least the ones with 2 vCPUs) configured with the following:
    – Python (ideally > 3.8) (optionally with virtual environment configured)
    – JDK (ideally Oracle’s but OpenJDK works fine as well. I’ve specifically found Oracle JDK 11 to be quite reliable for Apache Spark 3)
  • S3 bucket (where you will hydrate/store data for your Delta Lake)
  • IAM role attached to your EC2 instance that allows read/write from/to the S3 bucket

Steps

First step is to install PySpark in your (virtual) environment. At the time of this writing, I’ve found pyspark 3.2.2 to be quite stable when used in conjunction with Delta Lake dependencies. So I’ll be using that in this article.

If you are using pip to install dependencies in your environment, run this:

pip install pyspark==3.2.2

if all goes well, PySpark module will be installed in your Python environment! Time to use it now.

So here is a high level breakdown of what’s going in the code-snippet:

  • Line # 1 — we import SparkSession class from the pyspark.sql module
  • Line # 2 — We specify the dependencies that are required for Spark to work e.g. to allow Spark to interact with AWS (S3 in our case), use Delta Lake core etc
  • Line # 3 — We instantiate SparkSession object which marks as an entry point to use Spark in our script. A lot is happening while creation of the object and you may want to go through Spark documentation to understand the rationale of each but a few key ones are:
    – spark.sql.extensions: Extending Spark Session object to use Delta Lake capabilities
    – spark.jars.packages: Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths
    – spark.sql.sources.partitionOverwriteMode: Allows better/dynamic writes to partitioned data i.e. don’t delete existing partition and just insert those present in the current dataframe

Tip: If you are behind a corporate HTTP/HTTPS proxy, you can add the following config to allow traversing through the proxy to allow Spark to download the specified jars/packages from Maven repo:

.config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC -Dhttp.proxyHost=proxy_hostname -Dhttp.proxyPort=port -Dhttps.proxyHost=proxy_hostname -Dhttps.proxyPort=port")

Once that’s done, Spark Session object will be ready for use.

Reading data from S3

Lets work with a sample dataset. For this article, I’ve chosen employment data from Stats NZ website (refer to the resources section at the end of the article about the licensing aspects of the dataset).

I uploaded the CSV file to a bucket in my account as well.

Here’s how you can read a CSV file with header in Spark:

df = spark.read.option("recursiveFileLookup", "true").option("header","true").csv("s3://your_bucket/test/machine-readable-business-employment-data-mar-2022-quarter.csv")

just a note that as Spark follows lazy execution model so at this stage it won’t have loaded any data in memory. Only when you perform an action in Spark (e.g. .count(), .show() functions) then it will load the required data in memory for processing.

Let’s have a sneak-peek of how the data looks like:

df.show()

and the output will look something like this:

SQL Based Transformations on data

One of the most powerful features of Spark is the ability to run SQL queries on data. This significantly accelerates and simplifies ETL as you can express pretty powerful data transformation logic via SQL quickly instead of writing lengthy/complex code to achieve the same.

To run SQL queries, you’ll need to create a “temporary view” on the data. This view isn’t registered with the integrated catalog/hive-metastore (e.g. Glue) and will just be local to the current session.

To create a temporary view:

df.createOrReplaceTempView("employment_tbl")

Once done, you can run SQL queries against the data sitting on S3. You can do by using spark.sql() function:

spark.sql("select Series_title_2,count(*) as count from employment_tbl group by Series_title_2 order by 2 desc").show(truncate=False)

and the output looks like this:

Here we are running a group by SQL query to count how many times unique values in series_title_2 column appear.

You can also create a new dataframe from this transformation logic:

df_groupby = spark.sql("select Series_title_2,count(*) as count from employment_tbl group by Series_title_2 order by 2 desc")

Writing data in Delta format to S3

Let’s write this transformed dataframe to S3 in delta format to materialize our “delta-lake”

df_groupby.write.format("delta").save("s3://your_bucket/test/sample_data/")

Once done, you can verify from S3 e.g. via Management console that the data has been written in Delta format:

and if you want to read this data, in delta format, in Spark:

df = spark.read.format("delta").load("s3://your_bucket/test/sample_data/")

and if you want to see the data:

df.show()

and here’s how it will look like:

Performing Updates on the Data

One of the unique propositions of technologies like Delta Lake is the ability to perform ACID compliant updates/deletes to the data on data-lake. Traditionally, in the Hadoop influenced Big Data paradigm, this has been a critical limitations and data engineers had to resort to various workarounds to achieve the desired update/delete effect in the datasets. But now, this capability is supported natively in Delta Lake (and in Hudi, Iceberg).

Let’s assume that you want to delete all records where series_title_2 = ‘Other Services’. One of the ways you can do so is as follows.

In the above snippet, we created a reference to the delta-table at the path where the table data is stored as delta format. and then we specified the logic how to update data i.e. change Series_title_2 column value from ‘Other Services’ to ‘Others’. When executed, this directly applies the updates to the underlying delta files on S3.

To verify if the updates took place, let’s read the data back in dataframe and run some SQL queries:

df = spark.read.format("delta").load("s3://your_bucket/test/delta_format/")df.createOrReplaceTempView("employment_tbl")spark.sql("select count(*) from employment_tbl where Series_title_2 = 'Other Services'").show()

this count be zero:

let’s run another SQL query that performs Group By to verify that ‘Other Services’ isn’t there and it has been replaced with ‘Others’:

As you can see that the 4th last entry in the dataframe is ‘Others’ and if you compare it with the output of group by query that we ran before, you will find that it was ‘Other Services’.

Using similar approach, you can delete data as well. Also, I haven’t used SQL expressions to perform updates and deletes but you can easily do that as well.

Thus, if you have been following along with the article, you have completed the whole workflow i.e. reading raw data (in CSV format), doing some transformations, writing data in delta format, updating data and then reading data in delta format again. This underpins all the advanced transformations that can be achieved via Spark.

Its worth pointing out that the approach in this article does have a few limitations:

  • The setup isn’t integrated with a centralized Hive Metastore or Catalog e.g. Glue. As a result, if you run CREATE TABLE statements, those will not be registered in a central catalog and thus such tables won’t be accessible by other Spark instances or tools like Athena. Though you can configure to integrate with Glue catalog on your own but in such instances, EMR does offer some benefits as it provides out-of-the-box integration with Glue.
  • The setup isn’t distributed i.e. it doesn’t leverage multiple VMs to perform distributed computation.

ACID compliant datalake solutions like Delta Lake and Hudi etc and the associated companies like Databricks offering commercial solutions built on top of it are gaining quite good momentum in data industry so it will be a good investment of your time to be across this technology. The best way to learn is by doing and if you find a use-case where this can be valuable, start small, do experiments, break things, build stuff and evolve!

Reference


A step-by-step tutorial to configure Apache Spark and Delta Lake on EC2 in AWS along with code examples in Python

Photo by Joshua Sortino on Unsplash

If you have worked on engineering a datalake or lake-house solutions, chances are that you may have employed (or have heard of) de-coupled and distributed computation frameworks against scalable storage layer of your datalake platform. Though the list of such computation frameworks is growing, but Apache Spark has continued to evolve and prove its robustness in the big data processing landscape quite consistently. A number of vendors are offering varying flavors (e.g. managed, serverless, containerized ) of Apache Spark based solutions (e.g. Databricks, Amazon EMR) in accordance with the growing success of Apache Spark as well. Additionally, there has been a surge in addressing ACID limitations on existing data-lake solution and you may have heard of solutions like Delta Lake, Hudi, Iceberg in this context. All of this seems fascinating but it can be a bit overwhelming for beginners at times. There is a possibility that you may want to start small to embrace the potential of Apache Spark on an ACID compliant data-lake (e.g. by having it properly configured in a VM (EC2) in AWS). Or you may want to address specific use-cases which don’t require vendor-based offerings or don’t require a fleet of instances for massive distributed processing. Some examples of such specific use-cases could be:

  • You find that your existing ETL processes don’t have SQL like processing capabilities. e.g. assuming that you are using Python, you are relying mostly on Python native data structures like lists, dictionaries and its modules likes csv, pandas to achieve desired transformation. You believe that having the capability to run SQL expressions interchangeable with more abstract and scalable data structures like dataframes to achieve your desired transformation can accelerate and simplify your development e.g. instead of resorting to multiple lines of codes to read data from landing directory and split it into multiple target directories based on value of column; you can achieve it conveniently with a couple of lines of code in Spark SQL.
  • You find S3 Select to be pretty limiting and want a better solution to run SQL on S3 in addition to Athena.
  • If the velocity of your data is near-real-time and at any point in time, the amount of data being processed can fit within the resources i.e. CPU and RAM of a VM and thus it doesn’t require distributed computation.
  • You want to prototype Apache Spark and Delta Lake based solution to substantiate its business value by starting small i.e. by running it in an EC2 instance of your choice. You don’t want to run a full-blown vendor based solution for your initial prototype. Same heuristics apply if you want to learn these new technologies.

If any one of the above clauses resonate with you, then you will probably find this article quite helpful as it explains in simple/easy way how to get started with Apache Spark and Delta Lake on an EC2 instance in AWS.

Pre-Requisites

To follow along, you will require the following:

  • An AWS Account
  • EC2 instance (can be of any size but would suggest at least the ones with 2 vCPUs) configured with the following:
    – Python (ideally > 3.8) (optionally with virtual environment configured)
    – JDK (ideally Oracle’s but OpenJDK works fine as well. I’ve specifically found Oracle JDK 11 to be quite reliable for Apache Spark 3)
  • S3 bucket (where you will hydrate/store data for your Delta Lake)
  • IAM role attached to your EC2 instance that allows read/write from/to the S3 bucket

Steps

First step is to install PySpark in your (virtual) environment. At the time of this writing, I’ve found pyspark 3.2.2 to be quite stable when used in conjunction with Delta Lake dependencies. So I’ll be using that in this article.

If you are using pip to install dependencies in your environment, run this:

pip install pyspark==3.2.2

if all goes well, PySpark module will be installed in your Python environment! Time to use it now.

So here is a high level breakdown of what’s going in the code-snippet:

  • Line # 1 — we import SparkSession class from the pyspark.sql module
  • Line # 2 — We specify the dependencies that are required for Spark to work e.g. to allow Spark to interact with AWS (S3 in our case), use Delta Lake core etc
  • Line # 3 — We instantiate SparkSession object which marks as an entry point to use Spark in our script. A lot is happening while creation of the object and you may want to go through Spark documentation to understand the rationale of each but a few key ones are:
    – spark.sql.extensions: Extending Spark Session object to use Delta Lake capabilities
    – spark.jars.packages: Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths
    – spark.sql.sources.partitionOverwriteMode: Allows better/dynamic writes to partitioned data i.e. don’t delete existing partition and just insert those present in the current dataframe

Tip: If you are behind a corporate HTTP/HTTPS proxy, you can add the following config to allow traversing through the proxy to allow Spark to download the specified jars/packages from Maven repo:

.config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC -Dhttp.proxyHost=proxy_hostname -Dhttp.proxyPort=port -Dhttps.proxyHost=proxy_hostname -Dhttps.proxyPort=port")

Once that’s done, Spark Session object will be ready for use.

Reading data from S3

Lets work with a sample dataset. For this article, I’ve chosen employment data from Stats NZ website (refer to the resources section at the end of the article about the licensing aspects of the dataset).

I uploaded the CSV file to a bucket in my account as well.

Here’s how you can read a CSV file with header in Spark:

df = spark.read.option("recursiveFileLookup", "true").option("header","true").csv("s3://your_bucket/test/machine-readable-business-employment-data-mar-2022-quarter.csv")

just a note that as Spark follows lazy execution model so at this stage it won’t have loaded any data in memory. Only when you perform an action in Spark (e.g. .count(), .show() functions) then it will load the required data in memory for processing.

Let’s have a sneak-peek of how the data looks like:

df.show()

and the output will look something like this:

SQL Based Transformations on data

One of the most powerful features of Spark is the ability to run SQL queries on data. This significantly accelerates and simplifies ETL as you can express pretty powerful data transformation logic via SQL quickly instead of writing lengthy/complex code to achieve the same.

To run SQL queries, you’ll need to create a “temporary view” on the data. This view isn’t registered with the integrated catalog/hive-metastore (e.g. Glue) and will just be local to the current session.

To create a temporary view:

df.createOrReplaceTempView("employment_tbl")

Once done, you can run SQL queries against the data sitting on S3. You can do by using spark.sql() function:

spark.sql("select Series_title_2,count(*) as count from employment_tbl group by Series_title_2 order by 2 desc").show(truncate=False)

and the output looks like this:

Here we are running a group by SQL query to count how many times unique values in series_title_2 column appear.

You can also create a new dataframe from this transformation logic:

df_groupby = spark.sql("select Series_title_2,count(*) as count from employment_tbl group by Series_title_2 order by 2 desc")

Writing data in Delta format to S3

Let’s write this transformed dataframe to S3 in delta format to materialize our “delta-lake”

df_groupby.write.format("delta").save("s3://your_bucket/test/sample_data/")

Once done, you can verify from S3 e.g. via Management console that the data has been written in Delta format:

and if you want to read this data, in delta format, in Spark:

df = spark.read.format("delta").load("s3://your_bucket/test/sample_data/")

and if you want to see the data:

df.show()

and here’s how it will look like:

Performing Updates on the Data

One of the unique propositions of technologies like Delta Lake is the ability to perform ACID compliant updates/deletes to the data on data-lake. Traditionally, in the Hadoop influenced Big Data paradigm, this has been a critical limitations and data engineers had to resort to various workarounds to achieve the desired update/delete effect in the datasets. But now, this capability is supported natively in Delta Lake (and in Hudi, Iceberg).

Let’s assume that you want to delete all records where series_title_2 = ‘Other Services’. One of the ways you can do so is as follows.

In the above snippet, we created a reference to the delta-table at the path where the table data is stored as delta format. and then we specified the logic how to update data i.e. change Series_title_2 column value from ‘Other Services’ to ‘Others’. When executed, this directly applies the updates to the underlying delta files on S3.

To verify if the updates took place, let’s read the data back in dataframe and run some SQL queries:

df = spark.read.format("delta").load("s3://your_bucket/test/delta_format/")df.createOrReplaceTempView("employment_tbl")spark.sql("select count(*) from employment_tbl where Series_title_2 = 'Other Services'").show()

this count be zero:

let’s run another SQL query that performs Group By to verify that ‘Other Services’ isn’t there and it has been replaced with ‘Others’:

As you can see that the 4th last entry in the dataframe is ‘Others’ and if you compare it with the output of group by query that we ran before, you will find that it was ‘Other Services’.

Using similar approach, you can delete data as well. Also, I haven’t used SQL expressions to perform updates and deletes but you can easily do that as well.

Thus, if you have been following along with the article, you have completed the whole workflow i.e. reading raw data (in CSV format), doing some transformations, writing data in delta format, updating data and then reading data in delta format again. This underpins all the advanced transformations that can be achieved via Spark.

Its worth pointing out that the approach in this article does have a few limitations:

  • The setup isn’t integrated with a centralized Hive Metastore or Catalog e.g. Glue. As a result, if you run CREATE TABLE statements, those will not be registered in a central catalog and thus such tables won’t be accessible by other Spark instances or tools like Athena. Though you can configure to integrate with Glue catalog on your own but in such instances, EMR does offer some benefits as it provides out-of-the-box integration with Glue.
  • The setup isn’t distributed i.e. it doesn’t leverage multiple VMs to perform distributed computation.

ACID compliant datalake solutions like Delta Lake and Hudi etc and the associated companies like Databricks offering commercial solutions built on top of it are gaining quite good momentum in data industry so it will be a good investment of your time to be across this technology. The best way to learn is by doing and if you find a use-case where this can be valuable, start small, do experiments, break things, build stuff and evolve!

Reference

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment