Idempotent Writes to Delta Lake Tables | by Yousry Mohamed | Jul, 2022


Walkthrough using open source delta lake

https://unsplash.com/photos/JI0KxozvOtQ

According to Wikipedia:

Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application.

Some of the non technical examples are elevator call buttons and crosswalk buttons. Having an idempotent software API is a critical required characteristic in many situations. One of such situations is Spark structured streaming. Structured streaming by default guarantees exactly-once fault tolerance using checkpoints and write-ahead logs. That’s does not apply when foreachBatch API is used.

This API is very powerful and usually needed to apply Spark SQL batch APIs on streaming data or write the output to multiple destinations or call APIs like MERGE INTO to update downstream delta tables. The major problem with this approach is that if the stream fails or interrupted inside foreachBatch , there will be a potential opportunity of duplicates in the target tables. In this post, we will see what would be the impact of such (not so unusual) situation and how to handle it.

Fortunately, Delta Lake has been made completely open source so that makes it easy to understand how a certain feature like delta lake idempotent table writes is being implemented and what are its limits.

That should be easy, all we need is the latest preview version of delta lake and a python 3.7+ virtual environment.

  • You can use Mac/Linux/Windows but on Windows it is better to use WSL2. There will be Spark here and you don’t want to waste your time with wintuils and other steps to get Spark running on Windows.
  • Create a new Python environment and activate it, use your preferred environment management tool.
  • Run pip install delta-spark==2.0.0rc1 .
  • Run pyspark --version to confirm your installation, it should show 3.2.1 as it comes bundled with delta-spark .

We all love the happy scenario! At least it feels good to see the code works as expected and produces some outcome. Let’s build some basic Spark structured streaming setup. The source will be a delta table with 10 commits where each commit is a single file. The destination is another delta table but the writing will be done using foreachBatch API not as a classic delta streaming sink.

Copy the contents of the following gist and save it as producer.py .

Producer script does the following:

  • Start with some imports and preparing a folder to hold delta tables and streaming checkpoint.
  • Create a Spark session with delta lake dependency wired up for us.
  • Create some dummy DataFrame and append it to a delta table location called source. This process is repeated 10 times and each append (commit) has 10 records and will be saved in a single file hence repartition(1). It is designed this way because the streaming application will be configured to pull one data file per micro-batch (one to one mapping to a commit) just to make things easy to understand. All in all our source table has 100 records split into 10 files.
  • Last section of the script echos some metrics and details about written data to confirm it is in a good shape for next step.

Within the same Python environment from the previous step, run python ./producer.py and you should get something like the following.

Image by author

Now, create another file called consumer.py and populate it with the below gist inside it.

This file is the other side of the coin for the producer:

  • It starts with the classic imports and creating a Spark session.
  • It then defines the foreachBatch API callback function which simply prints the batch Id, echos the contents of the micro-batch and finally appends it to the target delta table. This is the bare basic logic that can be used. Printing batch Id and the micro-batch is just for illustration purpose.

Tip: Any Spark action you will do on the micro-batch will trigger the whole lineage. So, it might be useful to cache the micro-batch if it will be used several times within the function.

  • The next section in the script is the streaming query which:
    – Reads a stream of data from the delta table prepared using the producer script.
    – Pulls one single parquet data file from source per micro-batch by using maxFilesPerTrigger
    – Sets up checkpoint location and wires the handler offoreachBatch
    – Triggers the stream with a 2 seconds duration and runs it for maximum 30 seconds and then continues to the next step. If we use a delta lake compatible with Spark 3.3 (which is not the case for now at least for OSS delta), we could have used streaming trigger availableNow . That would relieved us from specifying a timeout and it would still respect other options like maxFilesPerTrigger . The assumption here is that on most machines the all the source table files will be processed in less than 30 seconds.
  • And finally, the number of records in target table are printed and guess what? That should print 100 unless something is really broken.

Now run python ./consumer.py and let’s see if it will be really a happy scenario 😅. The expected outcome will look like the following.

Image by author

As you can see, the Id and contents of each micro-batch are printed and the final record count of target table of 100 proves that everything works as expected.

https://imgflip.com/i/6lmfss

Alright, it is time to make things crash. To start fresh, you can delete the folders of checkpoint and target table but it is much easier to just run the producer once again which will clean up all the relevant folders including checkpoint and target table.

This time we will simulate something wrong happening directly after writing the micro-batch number 5. Modify batch_function code inside the consumer file to be as follows:

def batch_function(df, batch_id):    print(f"Processing micro-batch {batch_id}")    df.show()    df.write.mode("append").format("delta").save(target)    if batch_id == 5:        raise RuntimeError("Simulated exception!")

The last two lines raise a dummy exception after writing data of batch Id 5. This will cause the stream to crash and fail writing the checkpoint file. Effectively, this simulates an impact on target system that is not recorded or tracked. Now let’s give it a go.

python ./consumer.py
Image by author

As expected, the program crashed but we would like to confirm if data has been written to target or not. You can run an isolated PySpark shell session to verify that.

pyspark --packages io.delta:delta-core_2.12:2.0.0rc1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

The number of records is 60 which is right because we start from batch Id 0 and the contents of batch Id 5 are there in the target table so the write operation happened.

Now imagine what would happen if we remove the simulated exception and re-run the consumer app!

Streaming checkpoint folder does not have any persisted data regarding batch Id 5, so the stream will resume at batch Id 5 which apparently will produce duplicate data in target delta table.

So, remove the simulated exception lines and re-run the consumer. The stream should start at batch Id 5 and number of records in target table will be 110 not 100 which is an indication of data corruption (duplication in our case).

Image by author

Because this is a very common use case as stream restarts are inevitable, delta lake team figured out a solution that works quite well when writing from a stream to delta table. Keep that in mind because it is not a universal solution so for example it does not make sense to use it when writing to plain parquet files.

Anyway, because all those advanced delta lake features are available on GitHub now due to delta lake recent open sourcing; we can inspect the source code and understand how it works before even trying it.

The core solution revolves around two options that can be filled for the streaming query. The first one is txnAppId which holds an Id for the application writing to the target delta table. This is needed because a delta table can have multiple streams writing into it in parallel and each one of them could be a standalone different application. The other option txnVersion is a serial number that indicates an application version of the data being written. In foreachBatch case, that would be usually the batch Id. The pseudo code for those two options looks like:

txnVersion = options.get("txnVersion")
txnAppId = options.get("txnAppId")

Next, the writer class will simply skip the write if the received version has been seen before in delta table (for the same appId). Delta table transaction log does not have to store all the historical versions, only the latest one per each app Id is enough. I will come to that later. The pseudo code of that logic looks like this.

def hasTransactionBeenExecutedBefore(table, txnVersion, txnAppId)    latestSeenVersion = table.getLatestSeenVersionForApp(txnAppId)    if (latestSeenVersion >= txnVersion)        return true
else
return false

Now let’s start fresh:

  • Run the producer
  • Update the df.write function in consumer file to have .option("txnAppId", "idempotent_app").option("txnVersion", batch_id)after the .write .
  • Run the consumer with the simulated exception.

The program should fail as expected and micro-batch 5 will be written to target delta table. What can be checked now is the delta log files for target table.

Open any one of the target delta table commit files like /tmp/idempotent-writes/target/_delta_log/00000000000000000003.json in your preferred text editor.

The first line is a delta lake protocol action called a transaction identifier. As you cane see, it sores the fixed application Id we sent as an option and a transaction version which is the batch Id. Delta lake behind the scenes collects all those json files (and sometimes another parquet file called checkpoint which is written after every 10 commits) to constitute a snapshot view of the delta table state. In our case the state has the latest version written by each application Id which is a single application in our case. Hence, with the next write; delta lake can decide whether it has seen the same write attempt before or not and based on that can proceed or skip it.

Enough theory!

Remove the simulated exception and run the consumer once again.

Boom! Target table has 100 records identical to source and you can even inspect the target table and make sure there is no duplicates (id & group columns can act as a composite PK)

The concept is simple and cool and might be useful in non-streaming scenarios as well .

Transaction identifier action will be always recorded in delta log JSON or parquet files. As I mentioned, delta-lake does not need to pull the full list of versions to implement this logic. Only the latest entry per each application Id is enough. For example, here how txn action appears in delta log after 100 micro-batches on target table.

This feature is pretty much necessary when writing to delta lake tables from foreachBatch API when using DataFrameWriter. Without it, duplicates may happen and they could be very hard to detect as they will just happen silently.

Remember to clean up your test data.

rm -r /tmp/idempotent-writes

The caveats are:

  • This feature is available only from DataFrameWriter API. It cannot be used (as of now) to run commands like MERGE INTO . Similarly, if writing to parquet or JDBC you need to solve the idempotency problem yourself.
  • Copying one warning from the docs:

If you delete the streaming checkpoint and restart the query with a new checkpoint, you must provide a different appId; otherwise, writes from the restarted query will be ignored because it will contain the same txnAppId and the batch ID would start from 0.

The above applies when you try to start fresh but instead if recreating target delta table, you choose to just delete the existing records. The table will be empty data-wise but the transaction log will still have the traces of previous streaming run and hence will just skip all batch Ids that have been seen before.

In the above case if you change logging level to INFO, you would see messages like the following indicating the micro-batches being skipped.

22/07/03 21:36:51 INFO WriteIntoDelta: Transaction write of version 0 for application id idempotent_app has already been committed in Delta table id d5e0a924-683a-45a0-a299-718b89409b6a. Skipping this write.

This is not a bad thing specially when it happens in the normal case of resuming a failed streaming app such that the last committed micro-batch will not be re-written to the target delta table.

Hopefully you got a feeling now on how idempotent writes in delta tables can be used to make your streaming workloads robust to transient failures.

Happy streaming!


Walkthrough using open source delta lake

https://unsplash.com/photos/JI0KxozvOtQ

According to Wikipedia:

Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application.

Some of the non technical examples are elevator call buttons and crosswalk buttons. Having an idempotent software API is a critical required characteristic in many situations. One of such situations is Spark structured streaming. Structured streaming by default guarantees exactly-once fault tolerance using checkpoints and write-ahead logs. That’s does not apply when foreachBatch API is used.

This API is very powerful and usually needed to apply Spark SQL batch APIs on streaming data or write the output to multiple destinations or call APIs like MERGE INTO to update downstream delta tables. The major problem with this approach is that if the stream fails or interrupted inside foreachBatch , there will be a potential opportunity of duplicates in the target tables. In this post, we will see what would be the impact of such (not so unusual) situation and how to handle it.

Fortunately, Delta Lake has been made completely open source so that makes it easy to understand how a certain feature like delta lake idempotent table writes is being implemented and what are its limits.

That should be easy, all we need is the latest preview version of delta lake and a python 3.7+ virtual environment.

  • You can use Mac/Linux/Windows but on Windows it is better to use WSL2. There will be Spark here and you don’t want to waste your time with wintuils and other steps to get Spark running on Windows.
  • Create a new Python environment and activate it, use your preferred environment management tool.
  • Run pip install delta-spark==2.0.0rc1 .
  • Run pyspark --version to confirm your installation, it should show 3.2.1 as it comes bundled with delta-spark .

We all love the happy scenario! At least it feels good to see the code works as expected and produces some outcome. Let’s build some basic Spark structured streaming setup. The source will be a delta table with 10 commits where each commit is a single file. The destination is another delta table but the writing will be done using foreachBatch API not as a classic delta streaming sink.

Copy the contents of the following gist and save it as producer.py .

Producer script does the following:

  • Start with some imports and preparing a folder to hold delta tables and streaming checkpoint.
  • Create a Spark session with delta lake dependency wired up for us.
  • Create some dummy DataFrame and append it to a delta table location called source. This process is repeated 10 times and each append (commit) has 10 records and will be saved in a single file hence repartition(1). It is designed this way because the streaming application will be configured to pull one data file per micro-batch (one to one mapping to a commit) just to make things easy to understand. All in all our source table has 100 records split into 10 files.
  • Last section of the script echos some metrics and details about written data to confirm it is in a good shape for next step.

Within the same Python environment from the previous step, run python ./producer.py and you should get something like the following.

Image by author

Now, create another file called consumer.py and populate it with the below gist inside it.

This file is the other side of the coin for the producer:

  • It starts with the classic imports and creating a Spark session.
  • It then defines the foreachBatch API callback function which simply prints the batch Id, echos the contents of the micro-batch and finally appends it to the target delta table. This is the bare basic logic that can be used. Printing batch Id and the micro-batch is just for illustration purpose.

Tip: Any Spark action you will do on the micro-batch will trigger the whole lineage. So, it might be useful to cache the micro-batch if it will be used several times within the function.

  • The next section in the script is the streaming query which:
    – Reads a stream of data from the delta table prepared using the producer script.
    – Pulls one single parquet data file from source per micro-batch by using maxFilesPerTrigger
    – Sets up checkpoint location and wires the handler offoreachBatch
    – Triggers the stream with a 2 seconds duration and runs it for maximum 30 seconds and then continues to the next step. If we use a delta lake compatible with Spark 3.3 (which is not the case for now at least for OSS delta), we could have used streaming trigger availableNow . That would relieved us from specifying a timeout and it would still respect other options like maxFilesPerTrigger . The assumption here is that on most machines the all the source table files will be processed in less than 30 seconds.
  • And finally, the number of records in target table are printed and guess what? That should print 100 unless something is really broken.

Now run python ./consumer.py and let’s see if it will be really a happy scenario 😅. The expected outcome will look like the following.

Image by author

As you can see, the Id and contents of each micro-batch are printed and the final record count of target table of 100 proves that everything works as expected.

https://imgflip.com/i/6lmfss

Alright, it is time to make things crash. To start fresh, you can delete the folders of checkpoint and target table but it is much easier to just run the producer once again which will clean up all the relevant folders including checkpoint and target table.

This time we will simulate something wrong happening directly after writing the micro-batch number 5. Modify batch_function code inside the consumer file to be as follows:

def batch_function(df, batch_id):    print(f"Processing micro-batch {batch_id}")    df.show()    df.write.mode("append").format("delta").save(target)    if batch_id == 5:        raise RuntimeError("Simulated exception!")

The last two lines raise a dummy exception after writing data of batch Id 5. This will cause the stream to crash and fail writing the checkpoint file. Effectively, this simulates an impact on target system that is not recorded or tracked. Now let’s give it a go.

python ./consumer.py
Image by author

As expected, the program crashed but we would like to confirm if data has been written to target or not. You can run an isolated PySpark shell session to verify that.

pyspark --packages io.delta:delta-core_2.12:2.0.0rc1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

The number of records is 60 which is right because we start from batch Id 0 and the contents of batch Id 5 are there in the target table so the write operation happened.

Now imagine what would happen if we remove the simulated exception and re-run the consumer app!

Streaming checkpoint folder does not have any persisted data regarding batch Id 5, so the stream will resume at batch Id 5 which apparently will produce duplicate data in target delta table.

So, remove the simulated exception lines and re-run the consumer. The stream should start at batch Id 5 and number of records in target table will be 110 not 100 which is an indication of data corruption (duplication in our case).

Image by author

Because this is a very common use case as stream restarts are inevitable, delta lake team figured out a solution that works quite well when writing from a stream to delta table. Keep that in mind because it is not a universal solution so for example it does not make sense to use it when writing to plain parquet files.

Anyway, because all those advanced delta lake features are available on GitHub now due to delta lake recent open sourcing; we can inspect the source code and understand how it works before even trying it.

The core solution revolves around two options that can be filled for the streaming query. The first one is txnAppId which holds an Id for the application writing to the target delta table. This is needed because a delta table can have multiple streams writing into it in parallel and each one of them could be a standalone different application. The other option txnVersion is a serial number that indicates an application version of the data being written. In foreachBatch case, that would be usually the batch Id. The pseudo code for those two options looks like:

txnVersion = options.get("txnVersion")
txnAppId = options.get("txnAppId")

Next, the writer class will simply skip the write if the received version has been seen before in delta table (for the same appId). Delta table transaction log does not have to store all the historical versions, only the latest one per each app Id is enough. I will come to that later. The pseudo code of that logic looks like this.

def hasTransactionBeenExecutedBefore(table, txnVersion, txnAppId)    latestSeenVersion = table.getLatestSeenVersionForApp(txnAppId)    if (latestSeenVersion >= txnVersion)        return true
else
return false

Now let’s start fresh:

  • Run the producer
  • Update the df.write function in consumer file to have .option("txnAppId", "idempotent_app").option("txnVersion", batch_id)after the .write .
  • Run the consumer with the simulated exception.

The program should fail as expected and micro-batch 5 will be written to target delta table. What can be checked now is the delta log files for target table.

Open any one of the target delta table commit files like /tmp/idempotent-writes/target/_delta_log/00000000000000000003.json in your preferred text editor.

The first line is a delta lake protocol action called a transaction identifier. As you cane see, it sores the fixed application Id we sent as an option and a transaction version which is the batch Id. Delta lake behind the scenes collects all those json files (and sometimes another parquet file called checkpoint which is written after every 10 commits) to constitute a snapshot view of the delta table state. In our case the state has the latest version written by each application Id which is a single application in our case. Hence, with the next write; delta lake can decide whether it has seen the same write attempt before or not and based on that can proceed or skip it.

Enough theory!

Remove the simulated exception and run the consumer once again.

Boom! Target table has 100 records identical to source and you can even inspect the target table and make sure there is no duplicates (id & group columns can act as a composite PK)

The concept is simple and cool and might be useful in non-streaming scenarios as well .

Transaction identifier action will be always recorded in delta log JSON or parquet files. As I mentioned, delta-lake does not need to pull the full list of versions to implement this logic. Only the latest entry per each application Id is enough. For example, here how txn action appears in delta log after 100 micro-batches on target table.

This feature is pretty much necessary when writing to delta lake tables from foreachBatch API when using DataFrameWriter. Without it, duplicates may happen and they could be very hard to detect as they will just happen silently.

Remember to clean up your test data.

rm -r /tmp/idempotent-writes

The caveats are:

  • This feature is available only from DataFrameWriter API. It cannot be used (as of now) to run commands like MERGE INTO . Similarly, if writing to parquet or JDBC you need to solve the idempotency problem yourself.
  • Copying one warning from the docs:

If you delete the streaming checkpoint and restart the query with a new checkpoint, you must provide a different appId; otherwise, writes from the restarted query will be ignored because it will contain the same txnAppId and the batch ID would start from 0.

The above applies when you try to start fresh but instead if recreating target delta table, you choose to just delete the existing records. The table will be empty data-wise but the transaction log will still have the traces of previous streaming run and hence will just skip all batch Ids that have been seen before.

In the above case if you change logging level to INFO, you would see messages like the following indicating the micro-batches being skipped.

22/07/03 21:36:51 INFO WriteIntoDelta: Transaction write of version 0 for application id idempotent_app has already been committed in Delta table id d5e0a924-683a-45a0-a299-718b89409b6a. Skipping this write.

This is not a bad thing specially when it happens in the normal case of resuming a failed streaming app such that the last committed micro-batch will not be re-written to the target delta table.

Hopefully you got a feeling now on how idempotent writes in delta tables can be used to make your streaming workloads robust to transient failures.

Happy streaming!

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – admin@technoblender.com. The content will be deleted within 24 hours.
DeltaIdempotentJulLakeMohamedtablesTech NewsTechnoblenderTechnologyWritesYousry
Comments (0)
Add Comment