Techno Blender
Digitally Yours.

Delta Lake: Keeping It Fast and Clean | by Vitor Teixeira | Feb, 2023

0 42


Simplified flow chart on how to keep Delta tables fast and clean (Image by Author)

Keeping Delta tables fast and clean is important for maintaining the efficiency of data pipelines. Delta tables can grow very large over time, leading to slow query performance and increased storage costs. However, there are several operations and trade-offs that can positively influence the speed of the tables.

In this blog post, we’ll use the people10m public dataset that is available on Databricks Community Edition, to showcase the best practices on how to keep the tables fast and clean using Delta operations while explaining what is happening behind the scenes.

We will start by inspecting what is inside the dataset. It is available by default on Databricks and you can access it here.

A small sample of the dataset
Files from the original Delta table

We have 16 parquet entries and a _delta_log folder containing all the transaction logs with all the changes that stack up to create our delta table.

If we check the contents of the log we can see a JSON file that describes the first transaction that was written when Databricks created this Delta table.

From analysis, we can see that this transaction includes several actions:

Commit info

{
"commitInfo": {
"timestamp": 1602173340340,
"userId": "360903564160648",
"userName": "[email protected]",
"operation": "WRITE",
"operationParameters": {
"mode": "ErrorIfExists",
"partitionBy": "[]"
},
"notebook": {
"notebookId": "1607762315395537"
},
"clusterId": "1008-160338-oil232",
"isolationLevel": "WriteSerializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "8",
"numOutputBytes": "221245652",
"numOutputRows": "10000000"
}
}
}

commitInfo contains all the information regarding the commit: which operation was made, by who, where, and at what time. TheoperationMetrics field shows that 8 files were written with a total of 1000000 records.

Protocol

{
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
}
}

The protocol action is used to increase the version of the Delta protocol that is required to read or write a given table. This allows excluding readers/writers that are on an old protocol and would miss the necessary features to correctly interpret the transaction log.

Metadata

{
"metaData": {
"id": "ee2db204-0e38-4962-92b0-83e5570d7cd5",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"firstName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"middleName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"lastName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthDate\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ssn\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1602173313568
}
}

The metadata action contains all the table metadata. It is required on the first action of a table as it contains its own definition. Subsequent modifications to the table metadata will originate a new action.

Add

{
"add": {
"path": "part-00000-373539c8-e620-43e4-82e0-5eba22bb3b77-c000.snappy.parquet",
"partitionValues": {},
"size": 27825521,
"modificationTime": 1602173334000,
"dataChange": true,
"stats": "{\"numRecords\":1249744,\"minValues\":{\"id\":3766824,\"firstName\":\"Aaron\",\"middleName\":\"Aaron\",\"lastName\":\"A'Barrow\",\"gender\":\"F\",\"birthDate\":\"1951-12-31T05:00:00.000Z\",\"ssn\":\"666-10-1008\",\"salary\":-20858},\"maxValues\":{\"id\":5016567,\"firstName\":\"Zulma\",\"middleName\":\"Zulma\",\"lastName\":\"Zywicki\",\"gender\":\"M\",\"birthDate\":\"2000-01-30T05:00:00.000Z\",\"ssn\":\"999-98-9985\",\"salary\":180841},\"nullCount\":{\"id\":0,\"firstName\":0,\"middleName\":0,\"lastName\":0,\"gender\":0,\"birthDate\":0,\"ssn\":0,\"salary\":0}}"
}
}
{
"add": {
"path": "part-00001-943ebb93-8446-4a6c-99f7-1ca12ec2511b-c000.snappy.parquet",
"partitionValues": {},
"size": 27781558,
"modificationTime": 1602173334000,
"dataChange": true,
"stats": "{\"numRecords\":1249537,\"minValues\":{\"id\":1267751,\"firstName\":\"Abbey\",\"middleName\":\"Abbey\",\"lastName\":\"A'Barrow\",\"gender\":\"F\",\"birthDate\":\"1951-12-31T05:00:00.000Z\",\"ssn\":\"666-10-1005\",\"salary\":-20925},\"maxValues\":{\"id\":2517287,\"firstName\":\"Zulma\",\"middleName\":\"Zulma\",\"lastName\":\"Zywicki\",\"gender\":\"F\",\"birthDate\":\"2000-01-30T05:00:00.000Z\",\"ssn\":\"999-98-9981\",\"salary\":165757},\"nullCount\":{\"id\":0,\"firstName\":0,\"middleName\":0,\"lastName\":0,\"gender\":0,\"birthDate\":0,\"ssn\":0,\"salary\":0}}"
}
}
...

The add action, as the name suggests, is used to modify the data in a table by adding individual logical files. It contains the metadata of the respective file as well as some data statistics that can be used for optimizations that we’ll talk about further down the article.

The log contains 8 add entries, from part-00000 to part-00007, that were truncated for simplicity.

If you wish to learn more about Delta’s protocol please refer to: https://github.com/delta-io/delta/blob/master/PROTOCOL.md

Now that we have analyzed the transaction logs and the dataset we are going to copy it to our own directory so that we can modify the table.

Vacuum

The first obvious answer is the VACUUM command. What this does is delete the files that no longer affect our Delta table, given a configurable delta.deletedFileRetentionDuration that defaults on 7 days.

After analyzing the dataset and the delta log, we found that we had 16 files, all of them older than the default retention interval, but only 8 of them were referenced in the log. This means that in theory if we run the command the other 8 files would be cleaned.

Let’s check the result in the underlying filesystem.

Files after running VACUUM on default settings

Surprisingly, the files were not cleaned up. What happened?

Something that I found surprising is that the timestamps that are internally used by VACUUM are not the ones referenced in the transaction log files in the add action but rather the modificationTime of the files. The reason for that is to avoid reading a huge number of JSON files to find what files should be selected for deletion. With that said, make sure to keep modificationTime intact when copying/migrating Delta tables.

Given that we just copied the entire dataset, the modificationTime is as of now and it won’t be selected for removal, at least until 7 days have passed. If we try to do so we’ll get the following warning:

For testing purposes, we are setting delta.retentionDurationCheck.enable=false so that we can demonstrate the command in action but it’s something that should be used with caution as it risks corrupting the table if any other active reader or writer depends on the data that is being deleted.

Files after VACUUM

Voilá, everything looks cleaner now. What about the transaction log? There are 4 new JSON files, each representing a new transaction.

Every time a VACUUM is requested, two new commits are generated in the transaction log, containing VACUUM START andVACUUM END operations, respectively.

{
"commitInfo": {
"timestamp": 1676202617353,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "VACUUM START",
"operationParameters": {
"retentionCheckEnabled": true,
"defaultRetentionMillis": 604800000
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 0,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": true,
"operationMetrics": {
"numFilesToDelete": "0"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "6b875d5e-4c0e-4724-a87b-a0a6bbfd8419"
}
}

The first one did not affect any files, hence the numFilesToDelete is 0.

{
"commitInfo": {
"timestamp": 1676206833338,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "VACUUM START",
"operationParameters": {
"retentionCheckEnabled": false,
"specifiedRetentionMillis": 0,
"defaultRetentionMillis": 604800000
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 2,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": true,
"operationMetrics": {
"numFilesToDelete": "8"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "42f93d56-8739-46d5-a8f9-c2c1daffe0ec"
}
}

The second one marked 8 files for deletion hence numFilesToDelete is 8.

In sum, VACUUMjobs are a must for storage cost reduction. However, we need to make sure to schedule them regularly (they don’t affect any running jobs), as they are not scheduled by default. In addition to this, we need to make sure to tweak the retention value for as long as we’d want to time travel and have the modificationTime in mind when migrating Delta tables.

Optimize

The next command we need to be aware of is OPTIMIZE. What this does is compact small files into larger ones, while keeping all the data intact and delta statistics re-calculated. It can greatly improve query performance, especially if the data is written using a Streaming job, where, depending on the trigger interval, a lot of small files can be generated.

The target file size can be changed by tweaking delta.targetFileSize. Have in mind that setting this value does not guarantee that all the files will end up with the specified size. The operation will make a best-effort attempt to be true to the target size but it heavily depends on the amount of data we’re processing as well as the parallelism.

In this example, we’ll set it to 80MB, since the dataset is much smaller than the default size which is 1GB.

Let’s analyze the transaction log commit after what happened after we run the command:

{
"commitInfo": {
"timestamp": 1676215176645,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "OPTIMIZE",
"operationParameters": {
"predicate": "[]",
"zOrderBy": "[]",
"batchId": "0",
"auto": false
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 2,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": false,
"operationMetrics": {
"numRemovedFiles": "8",
"numRemovedBytes": "221245652",
"p25FileSize": "59403028",
"minFileSize": "59403028",
"numAddedFiles": "3",
"maxFileSize": "88873012",
"p75FileSize": "88873012",
"p50FileSize": "87441438",
"numAddedBytes": "235717478"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "55389d3e-4dd5-43a9-b5e1-de67cde8bb72"
}
}

A total of 8 files were removed, and 3 were added. Our new target file size is 80MB so all of our files were compacted into three new ones. As the commit info shows, the log also contains 8 remove actions and three add actions that were omitted for simplicity.

You might be wondering if the OPTIMIZE command really did something useful in this specific dataset so let’s try and run a simple query.

With OPTIMIZE we have improved the scan time since we read fewer files. Nevertheless, we are still reading the whole dataset while trying to find salaries that are greater than 80000. We will tackle this issue in the next section of the article.

In sum, one should schedule OPTIMIZE jobs regularly since query reads can heavily benefit from having fewer files to read. Databricks recommends running it daily but it really depends on the frequency of the updates. Have in mind that OPTIMIZE can take some time and will increase processing costs.

Z-Order Optimize

Z-Ordering is a technique that is used to colocate related information in the same set of files.

When files are written to a Delta table, min, max, and count statistics are automatically added in a stats field on the add action as we’ve seen before. These statistics are used for data-skipping when querying the table. Data-skipping is an optimization that aims to optimize queries containing WHERE clauses. By default, the first 32 columns of the dataset have their statistics collected. It can be changed by tweaking delta.dataSkippingNumIndexedCols to the desired number. Have in mind that this can affect write performance, especially for long strings for which it is advised to move them to the end of the schema and set the property to a number lower than its index.

In the OPTIMIZE example, we’ve seen that even though we have these statistics collected we can’t really make use of them and still end up reading all the files. That’s because we don’t have any explicit ordering and the salaries are basically randomized between all files.

By adding a ZORDER-BY column with OPTIMIZE we can easily solve this issue:

Let’s analyze the transaction log:

{
"commitInfo": {
"timestamp": 1676217320722,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "OPTIMIZE",
"operationParameters": {
"predicate": "[]",
"zOrderBy": "[\"salary\"]",
"batchId": "0",
"auto": false
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 2,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": false,
"operationMetrics": {
"numRemovedFiles": "8",
"numRemovedBytes": "221245652",
"p25FileSize": "113573613",
"minFileSize": "113573613",
"numAddedFiles": "2",
"maxFileSize": "123467314",
"p75FileSize": "123467314",
"p50FileSize": "123467314",
"numAddedBytes": "237040927"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "0e9b6467-9385-42fa-bc1a-df5486fc997f"
}
}

There are some differences between both OPTIMIZE commands. The first we can notice is that, as expected, we now have a zOrderBy column in operationParameters. Moreover, even though we have specified the same target file size, the OPTIMIZE resulted in 2 files instead of 3, due to the statistics of our column.

Below is the add action for the first file. The stats show that this file contains all the records that have salaries between -26884 and 73676. With that said, our query should skip this file entirely since the salary value falls out of the range of our WHERE clause.

{
"add": {
"path": "part-00000-edb01f4d-18f1-4c82-ac18-66444343df9b-c000.snappy.parquet",
"partitionValues": {},
"size": 123467314,
"modificationTime": 1676217320000,
"dataChange": false,
"stats": "{\"numRecords\":5206176,\"minValues\":{\"id\":1,\"firstName\":\"Aaron\",\"middleName\":\"Aaron\",\"lastName\":\"A'Barrow\",\"gender\":\"F\",\"birthDate\":\"1951-12-31T05:00:00.000Z\",\"ssn\":\"666-10-1010\",\"salary\":-26884},\"maxValues\":{\"id\":9999999,\"firstName\":\"Zulma\",\"middleName\":\"Zulma\",\"lastName\":\"Zywicki\",\"gender\":\"M\",\"birthDate\":\"2000-01-30T05:00:00.000Z\",\"ssn\":\"999-98-9989\",\"salary\":73676},\"nullCount\":{\"id\":0,\"firstName\":0,\"middleName\":0,\"lastName\":0,\"gender\":0,\"birthDate\":0,\"ssn\":0,\"salary\":0}}",
"tags": {
"INSERTION_TIME": "1602173334000000",
"ZCUBE_ZORDER_CURVE": "hilbert",
"ZCUBE_ZORDER_BY": "[\"salary\"]",
"ZCUBE_ID": "493cfedf-fdaf-4d34-a911-b4663adefec7",
"OPTIMIZE_TARGET_SIZE": "83886080"
}
}
}

By running the query again after Z-Ordering the files, we can see that only one file was read and the other one was pruned.

Even though Z-Ordering for data-skipping looks to be a game changer, it must be used correctly in order to be efficient. Below we’ll list some key considerations that we must have when using Z-Ordering:

  1. Z-Ordering is only suited for columns with high cardinality, if it has a low cardinality we cannot benefit from data-skipping.
  2. We can specify multiple columns on Z-Order but the effectiveness of its data-skipping decreases with each extra column.
  3. Make sure to Z-Order only on columns for which statistics are available. Have in mind the index of the columns and that only the first 32 columns are analyzed.

Partitioning

Another technique that can be used is physical partitioning. While Z-ordering groups data with similar values under the same file, partitioning groups data files under the same folder.

Contrary to Z-Ordering, partitioning works best with low-cardinality columns. If we choose otherwise, we may end up with possibly infinite partitions and end up with a lot of small files which in the end results in performance issues.

We’ll be using gender as our partition column since it is the only one with low cardinality present in this dataset.

By doing this we end up with two folders, one for each gender. This type of segregation is rather useful for columns that have low cardinality, and are very often used in WHERE clauses in large tables.

Let’s suppose we now want to be able to extract insights based on gender and salary.

OPTIMIZE can be paired with a partition column if we only want to optimize a subset of the data. Below we’ll analyze data-skipping with and without partitioning in Z-Ordered tables to show how can we take advantage of both approaches. We’ve decreased the target file size to showcase the differences now that our data is split by gender under different files.

As shown above, without partitioning we have to read two files to get our results. We were able to skip 3 files by having it Z-Ordered by salary but had to fully read them to extract the request gender. With partitioning, we were able to skip a full partition, which filtered the gender basically for free, and 3 files due to the Z-Ordering.

As we can see, there are benefits to using both approaches simultaneously but it needs to be thoroughly thought through as it might only make a significant difference for very large tables.

In conclusion, keeping Delta tables clean is crucial for maintaining the performance and efficiency of data pipelines. Vacuuming and optimizing Delta tables can help reclaim storage space and improve query execution times. Understanding the small details of each operation is very important to proper fine-tuning which could otherwise lead to unnecessary storage and processing costs.

https://docs.databricks.com/delta/vacuum.html

https://docs.databricks.com/delta/optimize.html

https://docs.databricks.com/delta/data-skipping.html

https://docs.databricks.com/tables/partitions.html

https://docs.databricks.com/delta/best-practices.html

https://www.databricks.com/blog/2018/07/31/processing-petabytes-of-data-in-seconds-with-databricks-delta.html


Simplified flow chart on how to keep Delta tables fast and clean (Image by Author)

Keeping Delta tables fast and clean is important for maintaining the efficiency of data pipelines. Delta tables can grow very large over time, leading to slow query performance and increased storage costs. However, there are several operations and trade-offs that can positively influence the speed of the tables.

In this blog post, we’ll use the people10m public dataset that is available on Databricks Community Edition, to showcase the best practices on how to keep the tables fast and clean using Delta operations while explaining what is happening behind the scenes.

We will start by inspecting what is inside the dataset. It is available by default on Databricks and you can access it here.

A small sample of the dataset
Files from the original Delta table

We have 16 parquet entries and a _delta_log folder containing all the transaction logs with all the changes that stack up to create our delta table.

If we check the contents of the log we can see a JSON file that describes the first transaction that was written when Databricks created this Delta table.

From analysis, we can see that this transaction includes several actions:

Commit info

{
"commitInfo": {
"timestamp": 1602173340340,
"userId": "360903564160648",
"userName": "[email protected]",
"operation": "WRITE",
"operationParameters": {
"mode": "ErrorIfExists",
"partitionBy": "[]"
},
"notebook": {
"notebookId": "1607762315395537"
},
"clusterId": "1008-160338-oil232",
"isolationLevel": "WriteSerializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "8",
"numOutputBytes": "221245652",
"numOutputRows": "10000000"
}
}
}

commitInfo contains all the information regarding the commit: which operation was made, by who, where, and at what time. TheoperationMetrics field shows that 8 files were written with a total of 1000000 records.

Protocol

{
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
}
}

The protocol action is used to increase the version of the Delta protocol that is required to read or write a given table. This allows excluding readers/writers that are on an old protocol and would miss the necessary features to correctly interpret the transaction log.

Metadata

{
"metaData": {
"id": "ee2db204-0e38-4962-92b0-83e5570d7cd5",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"firstName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"middleName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"lastName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthDate\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ssn\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1602173313568
}
}

The metadata action contains all the table metadata. It is required on the first action of a table as it contains its own definition. Subsequent modifications to the table metadata will originate a new action.

Add

{
"add": {
"path": "part-00000-373539c8-e620-43e4-82e0-5eba22bb3b77-c000.snappy.parquet",
"partitionValues": {},
"size": 27825521,
"modificationTime": 1602173334000,
"dataChange": true,
"stats": "{\"numRecords\":1249744,\"minValues\":{\"id\":3766824,\"firstName\":\"Aaron\",\"middleName\":\"Aaron\",\"lastName\":\"A'Barrow\",\"gender\":\"F\",\"birthDate\":\"1951-12-31T05:00:00.000Z\",\"ssn\":\"666-10-1008\",\"salary\":-20858},\"maxValues\":{\"id\":5016567,\"firstName\":\"Zulma\",\"middleName\":\"Zulma\",\"lastName\":\"Zywicki\",\"gender\":\"M\",\"birthDate\":\"2000-01-30T05:00:00.000Z\",\"ssn\":\"999-98-9985\",\"salary\":180841},\"nullCount\":{\"id\":0,\"firstName\":0,\"middleName\":0,\"lastName\":0,\"gender\":0,\"birthDate\":0,\"ssn\":0,\"salary\":0}}"
}
}
{
"add": {
"path": "part-00001-943ebb93-8446-4a6c-99f7-1ca12ec2511b-c000.snappy.parquet",
"partitionValues": {},
"size": 27781558,
"modificationTime": 1602173334000,
"dataChange": true,
"stats": "{\"numRecords\":1249537,\"minValues\":{\"id\":1267751,\"firstName\":\"Abbey\",\"middleName\":\"Abbey\",\"lastName\":\"A'Barrow\",\"gender\":\"F\",\"birthDate\":\"1951-12-31T05:00:00.000Z\",\"ssn\":\"666-10-1005\",\"salary\":-20925},\"maxValues\":{\"id\":2517287,\"firstName\":\"Zulma\",\"middleName\":\"Zulma\",\"lastName\":\"Zywicki\",\"gender\":\"F\",\"birthDate\":\"2000-01-30T05:00:00.000Z\",\"ssn\":\"999-98-9981\",\"salary\":165757},\"nullCount\":{\"id\":0,\"firstName\":0,\"middleName\":0,\"lastName\":0,\"gender\":0,\"birthDate\":0,\"ssn\":0,\"salary\":0}}"
}
}
...

The add action, as the name suggests, is used to modify the data in a table by adding individual logical files. It contains the metadata of the respective file as well as some data statistics that can be used for optimizations that we’ll talk about further down the article.

The log contains 8 add entries, from part-00000 to part-00007, that were truncated for simplicity.

If you wish to learn more about Delta’s protocol please refer to: https://github.com/delta-io/delta/blob/master/PROTOCOL.md

Now that we have analyzed the transaction logs and the dataset we are going to copy it to our own directory so that we can modify the table.

Vacuum

The first obvious answer is the VACUUM command. What this does is delete the files that no longer affect our Delta table, given a configurable delta.deletedFileRetentionDuration that defaults on 7 days.

After analyzing the dataset and the delta log, we found that we had 16 files, all of them older than the default retention interval, but only 8 of them were referenced in the log. This means that in theory if we run the command the other 8 files would be cleaned.

Let’s check the result in the underlying filesystem.

Files after running VACUUM on default settings

Surprisingly, the files were not cleaned up. What happened?

Something that I found surprising is that the timestamps that are internally used by VACUUM are not the ones referenced in the transaction log files in the add action but rather the modificationTime of the files. The reason for that is to avoid reading a huge number of JSON files to find what files should be selected for deletion. With that said, make sure to keep modificationTime intact when copying/migrating Delta tables.

Given that we just copied the entire dataset, the modificationTime is as of now and it won’t be selected for removal, at least until 7 days have passed. If we try to do so we’ll get the following warning:

For testing purposes, we are setting delta.retentionDurationCheck.enable=false so that we can demonstrate the command in action but it’s something that should be used with caution as it risks corrupting the table if any other active reader or writer depends on the data that is being deleted.

Files after VACUUM

Voilá, everything looks cleaner now. What about the transaction log? There are 4 new JSON files, each representing a new transaction.

Every time a VACUUM is requested, two new commits are generated in the transaction log, containing VACUUM START andVACUUM END operations, respectively.

{
"commitInfo": {
"timestamp": 1676202617353,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "VACUUM START",
"operationParameters": {
"retentionCheckEnabled": true,
"defaultRetentionMillis": 604800000
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 0,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": true,
"operationMetrics": {
"numFilesToDelete": "0"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "6b875d5e-4c0e-4724-a87b-a0a6bbfd8419"
}
}

The first one did not affect any files, hence the numFilesToDelete is 0.

{
"commitInfo": {
"timestamp": 1676206833338,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "VACUUM START",
"operationParameters": {
"retentionCheckEnabled": false,
"specifiedRetentionMillis": 0,
"defaultRetentionMillis": 604800000
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 2,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": true,
"operationMetrics": {
"numFilesToDelete": "8"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "42f93d56-8739-46d5-a8f9-c2c1daffe0ec"
}
}

The second one marked 8 files for deletion hence numFilesToDelete is 8.

In sum, VACUUMjobs are a must for storage cost reduction. However, we need to make sure to schedule them regularly (they don’t affect any running jobs), as they are not scheduled by default. In addition to this, we need to make sure to tweak the retention value for as long as we’d want to time travel and have the modificationTime in mind when migrating Delta tables.

Optimize

The next command we need to be aware of is OPTIMIZE. What this does is compact small files into larger ones, while keeping all the data intact and delta statistics re-calculated. It can greatly improve query performance, especially if the data is written using a Streaming job, where, depending on the trigger interval, a lot of small files can be generated.

The target file size can be changed by tweaking delta.targetFileSize. Have in mind that setting this value does not guarantee that all the files will end up with the specified size. The operation will make a best-effort attempt to be true to the target size but it heavily depends on the amount of data we’re processing as well as the parallelism.

In this example, we’ll set it to 80MB, since the dataset is much smaller than the default size which is 1GB.

Let’s analyze the transaction log commit after what happened after we run the command:

{
"commitInfo": {
"timestamp": 1676215176645,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "OPTIMIZE",
"operationParameters": {
"predicate": "[]",
"zOrderBy": "[]",
"batchId": "0",
"auto": false
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 2,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": false,
"operationMetrics": {
"numRemovedFiles": "8",
"numRemovedBytes": "221245652",
"p25FileSize": "59403028",
"minFileSize": "59403028",
"numAddedFiles": "3",
"maxFileSize": "88873012",
"p75FileSize": "88873012",
"p50FileSize": "87441438",
"numAddedBytes": "235717478"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "55389d3e-4dd5-43a9-b5e1-de67cde8bb72"
}
}

A total of 8 files were removed, and 3 were added. Our new target file size is 80MB so all of our files were compacted into three new ones. As the commit info shows, the log also contains 8 remove actions and three add actions that were omitted for simplicity.

You might be wondering if the OPTIMIZE command really did something useful in this specific dataset so let’s try and run a simple query.

With OPTIMIZE we have improved the scan time since we read fewer files. Nevertheless, we are still reading the whole dataset while trying to find salaries that are greater than 80000. We will tackle this issue in the next section of the article.

In sum, one should schedule OPTIMIZE jobs regularly since query reads can heavily benefit from having fewer files to read. Databricks recommends running it daily but it really depends on the frequency of the updates. Have in mind that OPTIMIZE can take some time and will increase processing costs.

Z-Order Optimize

Z-Ordering is a technique that is used to colocate related information in the same set of files.

When files are written to a Delta table, min, max, and count statistics are automatically added in a stats field on the add action as we’ve seen before. These statistics are used for data-skipping when querying the table. Data-skipping is an optimization that aims to optimize queries containing WHERE clauses. By default, the first 32 columns of the dataset have their statistics collected. It can be changed by tweaking delta.dataSkippingNumIndexedCols to the desired number. Have in mind that this can affect write performance, especially for long strings for which it is advised to move them to the end of the schema and set the property to a number lower than its index.

In the OPTIMIZE example, we’ve seen that even though we have these statistics collected we can’t really make use of them and still end up reading all the files. That’s because we don’t have any explicit ordering and the salaries are basically randomized between all files.

By adding a ZORDER-BY column with OPTIMIZE we can easily solve this issue:

Let’s analyze the transaction log:

{
"commitInfo": {
"timestamp": 1676217320722,
"userId": "8019820830300763",
"userName": "vitor",
"operation": "OPTIMIZE",
"operationParameters": {
"predicate": "[]",
"zOrderBy": "[\"salary\"]",
"batchId": "0",
"auto": false
},
"notebook": {
"notebookId": "1087108890280137"
},
"clusterId": "0102-173902-b3a5lq4t",
"readVersion": 2,
"isolationLevel": "SnapshotIsolation",
"isBlindAppend": false,
"operationMetrics": {
"numRemovedFiles": "8",
"numRemovedBytes": "221245652",
"p25FileSize": "113573613",
"minFileSize": "113573613",
"numAddedFiles": "2",
"maxFileSize": "123467314",
"p75FileSize": "123467314",
"p50FileSize": "123467314",
"numAddedBytes": "237040927"
},
"engineInfo": "Databricks-Runtime/11.3.x-scala2.12",
"txnId": "0e9b6467-9385-42fa-bc1a-df5486fc997f"
}
}

There are some differences between both OPTIMIZE commands. The first we can notice is that, as expected, we now have a zOrderBy column in operationParameters. Moreover, even though we have specified the same target file size, the OPTIMIZE resulted in 2 files instead of 3, due to the statistics of our column.

Below is the add action for the first file. The stats show that this file contains all the records that have salaries between -26884 and 73676. With that said, our query should skip this file entirely since the salary value falls out of the range of our WHERE clause.

{
"add": {
"path": "part-00000-edb01f4d-18f1-4c82-ac18-66444343df9b-c000.snappy.parquet",
"partitionValues": {},
"size": 123467314,
"modificationTime": 1676217320000,
"dataChange": false,
"stats": "{\"numRecords\":5206176,\"minValues\":{\"id\":1,\"firstName\":\"Aaron\",\"middleName\":\"Aaron\",\"lastName\":\"A'Barrow\",\"gender\":\"F\",\"birthDate\":\"1951-12-31T05:00:00.000Z\",\"ssn\":\"666-10-1010\",\"salary\":-26884},\"maxValues\":{\"id\":9999999,\"firstName\":\"Zulma\",\"middleName\":\"Zulma\",\"lastName\":\"Zywicki\",\"gender\":\"M\",\"birthDate\":\"2000-01-30T05:00:00.000Z\",\"ssn\":\"999-98-9989\",\"salary\":73676},\"nullCount\":{\"id\":0,\"firstName\":0,\"middleName\":0,\"lastName\":0,\"gender\":0,\"birthDate\":0,\"ssn\":0,\"salary\":0}}",
"tags": {
"INSERTION_TIME": "1602173334000000",
"ZCUBE_ZORDER_CURVE": "hilbert",
"ZCUBE_ZORDER_BY": "[\"salary\"]",
"ZCUBE_ID": "493cfedf-fdaf-4d34-a911-b4663adefec7",
"OPTIMIZE_TARGET_SIZE": "83886080"
}
}
}

By running the query again after Z-Ordering the files, we can see that only one file was read and the other one was pruned.

Even though Z-Ordering for data-skipping looks to be a game changer, it must be used correctly in order to be efficient. Below we’ll list some key considerations that we must have when using Z-Ordering:

  1. Z-Ordering is only suited for columns with high cardinality, if it has a low cardinality we cannot benefit from data-skipping.
  2. We can specify multiple columns on Z-Order but the effectiveness of its data-skipping decreases with each extra column.
  3. Make sure to Z-Order only on columns for which statistics are available. Have in mind the index of the columns and that only the first 32 columns are analyzed.

Partitioning

Another technique that can be used is physical partitioning. While Z-ordering groups data with similar values under the same file, partitioning groups data files under the same folder.

Contrary to Z-Ordering, partitioning works best with low-cardinality columns. If we choose otherwise, we may end up with possibly infinite partitions and end up with a lot of small files which in the end results in performance issues.

We’ll be using gender as our partition column since it is the only one with low cardinality present in this dataset.

By doing this we end up with two folders, one for each gender. This type of segregation is rather useful for columns that have low cardinality, and are very often used in WHERE clauses in large tables.

Let’s suppose we now want to be able to extract insights based on gender and salary.

OPTIMIZE can be paired with a partition column if we only want to optimize a subset of the data. Below we’ll analyze data-skipping with and without partitioning in Z-Ordered tables to show how can we take advantage of both approaches. We’ve decreased the target file size to showcase the differences now that our data is split by gender under different files.

As shown above, without partitioning we have to read two files to get our results. We were able to skip 3 files by having it Z-Ordered by salary but had to fully read them to extract the request gender. With partitioning, we were able to skip a full partition, which filtered the gender basically for free, and 3 files due to the Z-Ordering.

As we can see, there are benefits to using both approaches simultaneously but it needs to be thoroughly thought through as it might only make a significant difference for very large tables.

In conclusion, keeping Delta tables clean is crucial for maintaining the performance and efficiency of data pipelines. Vacuuming and optimizing Delta tables can help reclaim storage space and improve query execution times. Understanding the small details of each operation is very important to proper fine-tuning which could otherwise lead to unnecessary storage and processing costs.

https://docs.databricks.com/delta/vacuum.html

https://docs.databricks.com/delta/optimize.html

https://docs.databricks.com/delta/data-skipping.html

https://docs.databricks.com/tables/partitions.html

https://docs.databricks.com/delta/best-practices.html

https://www.databricks.com/blog/2018/07/31/processing-petabytes-of-data-in-seconds-with-databricks-delta.html

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment