Techno Blender
Digitally Yours.

Test Driving Delta Lake 2.0 on AWS EMR — 7 Key Learnings | by Irfan Elahi | Oct, 2022

0 51


What I learned after using Delta Lake 2.0 on AWS EMR along with installation steps and performance benchmarks

Photo by Luís Sousa on Unsplash

If you have read my previous article about getting started with Delta Lake in AWS, you would have got the fundamental context and rationale that why offerings like Delta Lake are gaining traction and what type of use-cases they address. The article presented simple and easy steps to get started with delta lake and even though you can use the approach there to address certain some simple use-cases, there were a few limitations w.r.t scalability and cataloguing. Thus, if you want to work on relatively complex use-cases that warrant the following:

  • Using scalable infrastructure i.e. comprising of multiple VMs doing the distributed computations on Spark on big data sets with the added flexibility of being elastic i.e. ability to easily scale up or down as required
  • Using AWS Glue as Catalogue (i.e. configuring it as Hive metastore for Apache Spark)

and if you don’t want to go down the path of configuring the above by yourself, then there are a few options:

  • Consider commercial vendors offering Delta Lake solutions e.g. Databricks
  • Use Amazon Elastic MapReduce (EMR) in AWS which provides managed Hadoop framework and configure Delta Lake in it

If for some reason, the first option isn’t a good fit for you and you want to employ AWS native service, then EMR can be a way forward in this case. However, one major call-out is that EMR doesn’t natively support Delta Lake (like Databricks does). Thus, there will be some configurations involved to configure Delta Lake in EMR.

I recently worked on a use-case where I prototyped Delta Lake in EMR and wanted to use this article as an opportunity to share the key learnings from the experience along with the instructions on how to set it up as well.

Firstly, lets talk about configuring Delta Lake on EMR. I specifically wanted to use Delta Lake 2.0 as it offered a few propositions that I wanted to use e.g. z-ordering. You can read more about Delta Lake 2.0 and what if offers here.

The core steps to configure Delta Lake 2.0 on EMR are as follows (or at least that’s what I followed):

  • Launch EMR with the following services: Spark, Hadoop
  • Under AWS Glue Data Catalog settings, select Use for Spark table metadata.
  • Ensure that the EMR is using somewhat recent version of Python e.g. I used Python 3.9 (more on that later)
  • Ensure that your EMR master node can reach out to the internet e.g. via NAT Gateway or HTTP/HTTPS proxy as the setup will require downloading Delta Lake 2.0 dependencies from Maven repository once at the start.
  • PySpark shell — Access EMR Master Node via SSH and run this for running PySpark shell with Delta Lake 2.0 dependencies properly configured:
pyspark --packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC
  • Spark Submit Jobs — For running Spark Submit jobs that run your .py script in the EMR cluster, run this:
spark-submit --packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC helloworld.py

whereas helloworld.py looks like this:

In the script:

  • We import SparkSession from pyspark.sql module as it will be used to build Spark object in the next step.
  • We create Spark Object and specify its configuration. Specifically, we configure Spark to use Delta Lake at this step (Line # 5, Line #6)
  • We make sure that AWS Glue is used as Hive Metastore during our Spark Submit jobs (Line # 10, Line # 11). As you may have noticed, we don’t need to set this configuration while running PySpark shell but this is required while running spark-submit. Else, Spark won’t be integrated with Glue Catalog and won’t be able to use Database and Tables defined there.
  • We read a Delta Lake table in a Spark data frame (Line #15), create a temporary view on it (Line # 16) and then print it’s number of rows using Spark SQL (Line # 17)

You can use this as a basic template for writing your Spark Applications and running via spark-submit in EMR.

  • Tip: If you have deployed EMR in a subnet which requires an HTTP/HTTPS proxy for internet access, then also include the following properties:
-Dhttp.proxyHost=<proxy_hostname> -Dhttp.proxyPort=<proxy_port> -Dhttps.proxyHost=<proxy_hostname> -Dhttps.proxyPort=<proxy_port>"

Firstly, some positive learnings:

#1 — It works!

With the setup highlighted above, it was pleasing to see that Delta Lake 2.0 on EMR works! I was able to use the core Delta Lake features like Updates/Deletes on Delta tables in S3 without any issues. Also, upon increasing or decreasing the number of Core nodes in EMR, I could see that the corresponding change in resources being used for Spark jobs and thus the gains in overall performance.

#2 — Delta Lake Tables in AWS Glue

When you create a Delta table using Spark SQL in EMR (with Glue as metastore), it works and tables get created in AWS Glue Catalog. It can be verified if you go to Table’s definition in AWS Glue and access it’s advanced properties where it shows spark.sql.sources.provider is delta

However, unlike other table types (e.g. Parquet, CSV), it shows table schema as follows:

where it just shows columns as array type even if you define them properly with types like int and varchar during your table DDL. From readability point of view in Management Console, it’s not the best representation. But if you query the table in Spark SQL, it works fine and shows the individual columns with their correct types.

#3 — Python Version Conundrum

At the time of writing, I used emr-6.7.0 release label which had Spark 3.2.1 version in it. Interestingly in this release, EMR comes pre-installed with Python 3.7. I ran into a number of challenges while trying to properly configure Delta Lake 2.0 on Python 3.7. Thus, I had to follow the following steps:

  • Create an AMI with somewhat latest version of Python (Python 3.9 in my case) installed in it.
  • Launch EMR with the custom AMI

Interestingly, when I launched EMR with the custom AMI, it still had Python 3.7 as the default version for PySpark. To address this, I had to submit a configuration request to change the default version to Python 3.9. You can follow this article to understand how it’s done.

#4 — Z-Ordering Failure

I wanted to leverage z-ordering capability, a multi-dimensional clustering technique, of Delta Lake 2.0 as it appears to provide performance gains as it dramatically reduces the amount of data that Spark needs to read to process a query. Here’s how it can be done in Python:

You basically specify a column on which to perform z-ordering e.g. a timestamp column which is used in query predicates.

However, when trying to use that in Delta Lake on EMR setup, it continued to fail. It initiated the z-ordering process and progressed through a few stages but continued to fail before getting completed (with errors like containers getting lost and tasks getting killed).

Z-Ordering is a time consuming process when done on a whole table particularly when it’s huge. As shown in line # 9 in the code above, it can be done on a specific partition (provided your Delta Table is partitioned). My assumption was that Z-Ordering could at least work on a specific partition with less volume of data but it didn’t. I tried scaling-out the cluster by adding more task nodes but that didn’t help either.

#5 — No Sub-Queries support in DELETE or UPDATE Statements

If you intend to use sub-queries in your DELETE or UPDATE statements on Delta Tables e.g.

DELETE FROM table_name_1 WHERE EXISTS (Select 1 from table_name_2 where table_name_2.key = table_name_1.key)

Apparently, it’s not supported. It’s an open issue and can be tracked here

Interestingly, Delta Lake on Databricks does support sub-queries in this context. An example can be found here.

#6 — Optimizing Upsert/Merge in Delta Tables via Partition Pruning

Upsert is supported out-of-the-box in Delta Lake via Merge SQL command. However, I’ve found that if your table is partitioned and if you could partition pruning during merge, it could significantly improve performance. Refer to this article for more information.

#7 — Delta Lake 2.0 Installation Caveats

You may have noticed that the way I am managing Delta Lake 2.0 dependencies is by allowing it to be downloaded from Maven repo. Once downloaded, they will be locally cached and will be reused. This isn’t the ideal way of managing dependencies and there are or should be better ways of doing it e.g. making the Delta Lake 2.0 related jars available to the EMR cluster. You will probably find many tutorials online about different ways of going about it e.g. copying Delta Lake jar to a specific folder (e.g. /usr/lib/spark/jars), using a JSON conf while launching EMR cluster etc. For me, none of those approaches worked. What worked, for Delta Lake 1.0, was to put Delta Lake jars in a folder e.g. /home/hadoop/extrajars/ and updating spark-defaults.conf (located at /etc/spark/conf/spark-defaults.conf) to refer to the jar path in both spark driver and executor class path.

But… for Delta Lake 2.0 the approach didn’t work either as I kept getting Class Not Found related errors. Thus, I resorted to the approach I’ve highlighted in the article above i.e. let the dependencies be downloaded once from Maven and reused going forward.

Performance Benchmarks

Performance is always a major concern for such prototypes so I just thought to share my findings as well. With the following specifics of the setup:

  • EMR Cluster size: 1 Master Node (m4.large), 2 Task Nodes (m5.xlarge)
  • Raw batch size: 20 MB (gzipped, CSV)
  • Number of rows when de-duped on PK: around 5000
  • Delta Table Size: 600 GB (partitioned on year/month derived columns)
  • Relevant Partition size (based on the data in raw batch): 5GB (important as I used partition pruning logic while merging as highlighted above)
  • Elapsed time to run merge: 5 min approx

Is it good or bad? The answer is highly contextual and depends on a number of factors e.g. cost-benefit ratio, potential alternatives etc so I will let you be the judge of it.

Now, understandably, the article seems to carry somewhat negative connotations of the setup i.e. Delta Lake 2.0 on EMR however that isn’t the original intent of it. The key takeaway is that the setup works but with some quirks that I’ve highlighted. Now there is a possibility that I would’ve addressed these quirks if I had invested more time and efforts to them. Or there is a possibility that these will get addressed in upcoming EMR or Delta Lake 2.0 releases. If you have the appetite to embrace the setup and address the challenges highlighted, go for it. Or potentially consider commercial vendors offering such solutions with good support.


What I learned after using Delta Lake 2.0 on AWS EMR along with installation steps and performance benchmarks

Photo by Luís Sousa on Unsplash

If you have read my previous article about getting started with Delta Lake in AWS, you would have got the fundamental context and rationale that why offerings like Delta Lake are gaining traction and what type of use-cases they address. The article presented simple and easy steps to get started with delta lake and even though you can use the approach there to address certain some simple use-cases, there were a few limitations w.r.t scalability and cataloguing. Thus, if you want to work on relatively complex use-cases that warrant the following:

  • Using scalable infrastructure i.e. comprising of multiple VMs doing the distributed computations on Spark on big data sets with the added flexibility of being elastic i.e. ability to easily scale up or down as required
  • Using AWS Glue as Catalogue (i.e. configuring it as Hive metastore for Apache Spark)

and if you don’t want to go down the path of configuring the above by yourself, then there are a few options:

  • Consider commercial vendors offering Delta Lake solutions e.g. Databricks
  • Use Amazon Elastic MapReduce (EMR) in AWS which provides managed Hadoop framework and configure Delta Lake in it

If for some reason, the first option isn’t a good fit for you and you want to employ AWS native service, then EMR can be a way forward in this case. However, one major call-out is that EMR doesn’t natively support Delta Lake (like Databricks does). Thus, there will be some configurations involved to configure Delta Lake in EMR.

I recently worked on a use-case where I prototyped Delta Lake in EMR and wanted to use this article as an opportunity to share the key learnings from the experience along with the instructions on how to set it up as well.

Firstly, lets talk about configuring Delta Lake on EMR. I specifically wanted to use Delta Lake 2.0 as it offered a few propositions that I wanted to use e.g. z-ordering. You can read more about Delta Lake 2.0 and what if offers here.

The core steps to configure Delta Lake 2.0 on EMR are as follows (or at least that’s what I followed):

  • Launch EMR with the following services: Spark, Hadoop
  • Under AWS Glue Data Catalog settings, select Use for Spark table metadata.
  • Ensure that the EMR is using somewhat recent version of Python e.g. I used Python 3.9 (more on that later)
  • Ensure that your EMR master node can reach out to the internet e.g. via NAT Gateway or HTTP/HTTPS proxy as the setup will require downloading Delta Lake 2.0 dependencies from Maven repository once at the start.
  • PySpark shell — Access EMR Master Node via SSH and run this for running PySpark shell with Delta Lake 2.0 dependencies properly configured:
pyspark --packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC
  • Spark Submit Jobs — For running Spark Submit jobs that run your .py script in the EMR cluster, run this:
spark-submit --packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC helloworld.py

whereas helloworld.py looks like this:

In the script:

  • We import SparkSession from pyspark.sql module as it will be used to build Spark object in the next step.
  • We create Spark Object and specify its configuration. Specifically, we configure Spark to use Delta Lake at this step (Line # 5, Line #6)
  • We make sure that AWS Glue is used as Hive Metastore during our Spark Submit jobs (Line # 10, Line # 11). As you may have noticed, we don’t need to set this configuration while running PySpark shell but this is required while running spark-submit. Else, Spark won’t be integrated with Glue Catalog and won’t be able to use Database and Tables defined there.
  • We read a Delta Lake table in a Spark data frame (Line #15), create a temporary view on it (Line # 16) and then print it’s number of rows using Spark SQL (Line # 17)

You can use this as a basic template for writing your Spark Applications and running via spark-submit in EMR.

  • Tip: If you have deployed EMR in a subnet which requires an HTTP/HTTPS proxy for internet access, then also include the following properties:
-Dhttp.proxyHost=<proxy_hostname> -Dhttp.proxyPort=<proxy_port> -Dhttps.proxyHost=<proxy_hostname> -Dhttps.proxyPort=<proxy_port>"

Firstly, some positive learnings:

#1 — It works!

With the setup highlighted above, it was pleasing to see that Delta Lake 2.0 on EMR works! I was able to use the core Delta Lake features like Updates/Deletes on Delta tables in S3 without any issues. Also, upon increasing or decreasing the number of Core nodes in EMR, I could see that the corresponding change in resources being used for Spark jobs and thus the gains in overall performance.

#2 — Delta Lake Tables in AWS Glue

When you create a Delta table using Spark SQL in EMR (with Glue as metastore), it works and tables get created in AWS Glue Catalog. It can be verified if you go to Table’s definition in AWS Glue and access it’s advanced properties where it shows spark.sql.sources.provider is delta

However, unlike other table types (e.g. Parquet, CSV), it shows table schema as follows:

where it just shows columns as array type even if you define them properly with types like int and varchar during your table DDL. From readability point of view in Management Console, it’s not the best representation. But if you query the table in Spark SQL, it works fine and shows the individual columns with their correct types.

#3 — Python Version Conundrum

At the time of writing, I used emr-6.7.0 release label which had Spark 3.2.1 version in it. Interestingly in this release, EMR comes pre-installed with Python 3.7. I ran into a number of challenges while trying to properly configure Delta Lake 2.0 on Python 3.7. Thus, I had to follow the following steps:

  • Create an AMI with somewhat latest version of Python (Python 3.9 in my case) installed in it.
  • Launch EMR with the custom AMI

Interestingly, when I launched EMR with the custom AMI, it still had Python 3.7 as the default version for PySpark. To address this, I had to submit a configuration request to change the default version to Python 3.9. You can follow this article to understand how it’s done.

#4 — Z-Ordering Failure

I wanted to leverage z-ordering capability, a multi-dimensional clustering technique, of Delta Lake 2.0 as it appears to provide performance gains as it dramatically reduces the amount of data that Spark needs to read to process a query. Here’s how it can be done in Python:

You basically specify a column on which to perform z-ordering e.g. a timestamp column which is used in query predicates.

However, when trying to use that in Delta Lake on EMR setup, it continued to fail. It initiated the z-ordering process and progressed through a few stages but continued to fail before getting completed (with errors like containers getting lost and tasks getting killed).

Z-Ordering is a time consuming process when done on a whole table particularly when it’s huge. As shown in line # 9 in the code above, it can be done on a specific partition (provided your Delta Table is partitioned). My assumption was that Z-Ordering could at least work on a specific partition with less volume of data but it didn’t. I tried scaling-out the cluster by adding more task nodes but that didn’t help either.

#5 — No Sub-Queries support in DELETE or UPDATE Statements

If you intend to use sub-queries in your DELETE or UPDATE statements on Delta Tables e.g.

DELETE FROM table_name_1 WHERE EXISTS (Select 1 from table_name_2 where table_name_2.key = table_name_1.key)

Apparently, it’s not supported. It’s an open issue and can be tracked here

Interestingly, Delta Lake on Databricks does support sub-queries in this context. An example can be found here.

#6 — Optimizing Upsert/Merge in Delta Tables via Partition Pruning

Upsert is supported out-of-the-box in Delta Lake via Merge SQL command. However, I’ve found that if your table is partitioned and if you could partition pruning during merge, it could significantly improve performance. Refer to this article for more information.

#7 — Delta Lake 2.0 Installation Caveats

You may have noticed that the way I am managing Delta Lake 2.0 dependencies is by allowing it to be downloaded from Maven repo. Once downloaded, they will be locally cached and will be reused. This isn’t the ideal way of managing dependencies and there are or should be better ways of doing it e.g. making the Delta Lake 2.0 related jars available to the EMR cluster. You will probably find many tutorials online about different ways of going about it e.g. copying Delta Lake jar to a specific folder (e.g. /usr/lib/spark/jars), using a JSON conf while launching EMR cluster etc. For me, none of those approaches worked. What worked, for Delta Lake 1.0, was to put Delta Lake jars in a folder e.g. /home/hadoop/extrajars/ and updating spark-defaults.conf (located at /etc/spark/conf/spark-defaults.conf) to refer to the jar path in both spark driver and executor class path.

But… for Delta Lake 2.0 the approach didn’t work either as I kept getting Class Not Found related errors. Thus, I resorted to the approach I’ve highlighted in the article above i.e. let the dependencies be downloaded once from Maven and reused going forward.

Performance Benchmarks

Performance is always a major concern for such prototypes so I just thought to share my findings as well. With the following specifics of the setup:

  • EMR Cluster size: 1 Master Node (m4.large), 2 Task Nodes (m5.xlarge)
  • Raw batch size: 20 MB (gzipped, CSV)
  • Number of rows when de-duped on PK: around 5000
  • Delta Table Size: 600 GB (partitioned on year/month derived columns)
  • Relevant Partition size (based on the data in raw batch): 5GB (important as I used partition pruning logic while merging as highlighted above)
  • Elapsed time to run merge: 5 min approx

Is it good or bad? The answer is highly contextual and depends on a number of factors e.g. cost-benefit ratio, potential alternatives etc so I will let you be the judge of it.

Now, understandably, the article seems to carry somewhat negative connotations of the setup i.e. Delta Lake 2.0 on EMR however that isn’t the original intent of it. The key takeaway is that the setup works but with some quirks that I’ve highlighted. Now there is a possibility that I would’ve addressed these quirks if I had invested more time and efforts to them. Or there is a possibility that these will get addressed in upcoming EMR or Delta Lake 2.0 releases. If you have the appetite to embrace the setup and address the challenges highlighted, go for it. Or potentially consider commercial vendors offering such solutions with good support.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment