Leveraging Azure Event Grid to Create a Java Iceberg Table | by Jean-Claude Cote | Jan, 2023


Photo by Jackson Case on Unsplash

In our previous article, we demonstrated how an Iceberg table can act as a Kafka topic. We showed that independent Java Writers can produce parquet files in parallel, while a single Bookkeeper attaches these data files to an Iceberg table. The Bookkeeper did this by creating an Iceberg commit.

The Bookkeeper needs to identify what the newly created data files are. It then registers these files with the Iceberg table. In our previous article we kept things simple and chose to implement a file-based messaging channel. For every data file created, the Writers create a Moniker in a well-known folder. The Bookkeeper monitors this well-known folder and periodically reads the Monikers. The Bookkeeper uses the information in these Monikers to append the new data files to the Iceberg table.

The previous solution is entirely file-based. It does not rely on any external service. However, the file-based solution requires a lot of additional create/delete file operations and necessitates costly data lake list-files operation.

Here’s a high level diagram of our previous file-based solution.

All images unless otherwise noted are by the author

In this article we will attempt to improve on this solution by leveraging the Azure Event Grid service. The Event Grid is an event broker that you can use to implement an event-driven architecture. In an event-driven architecture, events are used to transmit state changes.

The state changes we are interested in transmitting are the file creation events when new files are added to the data lake by the Java Writers.

The consumer of these events is the Bookkeeper, which attaches the new data files to an Iceberg table.

Here’s a high level diagram of the queue-based solution.

All images unless otherwise noted are by the author

The Configuration

An Azure storage account consists of four services: Containers, File shares, Queues, and Tables. The two data storage services we are interested in are the Containers and the Queues. Containers are where the blob files are stored. Queues are where we will send file creation events.

Thus, we configure Event Grid to emit events every time a file is created inside a particular storage container. We configure Event Grid to publish these file creation events to a storage Queue.

Since we are only interested in file creation events, we configure Event Grid to send only the “Blob Created” events.

Also, we are only interested in files created under a certain path and with a certain file extension.

Finally, we can further narrow the events we will be processing by specifying advanced filters. When a Java Writer is done writing into a file, it invokes the REST call FlushWithClose. In the advanced filters, we specify data.api equals FlushWithClose.

We can target a particular Iceberg table or all Iceberg tables by specifying a subject which contains a substring. In our case we used /data/ which is the data folder used by all Iceberg tables.

When Java Writers create parquet files inside the /data/ folder, events are published to the storage Queue. We can use the Azure portal to monitor activity in the Event Grid. In particular, we can see that the delivered events are a subset of all the published events.

The Writers

The implementation of the Writers is essentially the same as in our previous file-based solution. In fact, the Writers are even a little simpler since they no longer need to publish notifications themselves. The Azure Event Grid takes care of publishing notifications. The Writers only write parquet files.

The Bookkeeper

The bookkeeper is still responsible for registering newly created data files to the Iceberg table. However, it now determines which files need to be appended by reading a queue of FlushWithClose events. We create a Java storage queue client like this

DataLakeTokenCredential token = new DataLakeTokenCredential();
String queueURL = Constants.hadoopConf.get("queue.location");
String queueName = Constants.hadoopConf.get("queue.name");
this.queueClient = new QueueClientBuilder()
.endpoint(queueURL)
.credential(token)
.queueName(queueName)
.buildClient();

Notice the use of our DataLakeTokenCredential class. Since the storage queue and the data lake (containers) reside within the same storage account, we leverage the same authentication mechanism for both.

Retrieving the events is as simple as calling receiveMessages.

List<QueueMessageItem> messages = queueClient.receiveMessages(MAX_MESSAGES,
Duration.ofSeconds(60), Duration.ofSeconds(10))
.collect(Collectors.toList());

Each of these messages will be a FlushWithClosewith the file path and file size. The messages are JSON that looks like this:

"data": {
"api": "FlushWithClose",
"contentType": "application/x-compressed",
"contentLength": 115739,
"blobType": "BlockBlob",
"blobUrl": "https://<accountname>.blob.core.windows.net/users/iceberg/schema/data/file.parquet",
"url": "https://<accountname>.blob.core.windows.net/users/iceberg/schema/data/file.parquet",
},

In our previous file-based solution, the Monikers (the messages) were written by the Java Writers and contained serialized Iceberg DataFile objects. Thus, the Bookkeeper could read these objects and directly do the following:

List<DataFile> dataFiles = readMonikers();
AppendFiles append = this.table.newAppend();
for (DataFile dataFile : dataFiles) {
append.appendFile(dataFile);
}
append.commit();

Now that we are using Azure Event Grid, we do not control the content of the messages. The Bookkeeper thus needs to create Iceberg DataFile objects using the file path and file size information provided by Event Grid.

We are missing the parquet metrics; fortunately, Iceberg has the necessary API to retrieve metrics from a parquet file.

InputFile in = this.table.io().newInputFile(f);
Metrics metrics = ParquetUtil.fileMetrics(in,
MetricsConfig.forTable(this.table));

Thanks to Samrose Ahmed for showing me this trick. Here’s his blog Serverless asynchronous Iceberg data ingestion, which uses the AWS SQS (the AWS equivalent to Azure Event Grid).

We now have all the information necessary to create an Iceberg DataFile object and commit it to an Iceberg table.

AppendFiles append = this.table.newAppend();
DataFile dataFile = DataFiles.builder(this.partitionSpec)
.withPath(filePath)
.withFileSizeInBytes(fileSize)
.withFormat("PARQUET")
.withMetrics(metrics)
.build();
append.appendFile(dataFile);
append.commit();

The Performance

When we profile this implementation, we quickly realize that most of the time is spent retrieving metrics from parquet files. Retrieving parquet file metrics is very efficient and only consists of reading the parquet file footer information. However, a REST call per file needs to be executed. The latencies of these REST calls can quickly add up. Fortunately, this can easily be remedied by parallelizing these requests using a Java executor service.

List<Callable<DataFile>> callableTasks = monikers.stream()
.map(m -> new MetricResolver(m, Constants.partitionSpec))
.collect(Collectors.toList());

List<Future<DataFile>> futures = executorService.invokeAll(callableTasks);

The Results

By leveraging an executor service for operations that are latent, we can greatly improve performance. Even when appending a large number of data files (hundreds), processing messages takes less than a second. Reading parquet metrics takes one to three seconds.

The only operation that cannot be optimized with an executor service is the Iceberg commit operation. Although, this operation is rather quick, taking between a second or two. On occasion, this operation can take up to 20 seconds when Iceberg reorganizes the manifests.

If we measure the delta between the time a data file is flushed and the time a data file is committed to the Iceberg table, we observe overall latencies as low as 3 seconds and up to 20 seconds.

The Conclusion

Using Azure Event Grid yields results similar to our previous file-based solution. However, the Event Grid solution yields more consistent results. We are not at the mercy of the file-listing data lake operation which can sometimes take a long time to execute, especially when the data lake is heavily used.

We also estimate the cost of “messaging” using Event Grid to be about 1/10th of “messaging” via a file-based solution. The storage cost of actual data files is the same in both solutions.

Leveraging Azure Event Grid is a great way to notify the Bookkeeper of newly created data files. However, if your storage appliance does not support a notification mechanism, using a file-based solution can also be viable.

If you are interested in evaluating these experiments for yourself, you can find the code used in these articles here.


Photo by Jackson Case on Unsplash

In our previous article, we demonstrated how an Iceberg table can act as a Kafka topic. We showed that independent Java Writers can produce parquet files in parallel, while a single Bookkeeper attaches these data files to an Iceberg table. The Bookkeeper did this by creating an Iceberg commit.

The Bookkeeper needs to identify what the newly created data files are. It then registers these files with the Iceberg table. In our previous article we kept things simple and chose to implement a file-based messaging channel. For every data file created, the Writers create a Moniker in a well-known folder. The Bookkeeper monitors this well-known folder and periodically reads the Monikers. The Bookkeeper uses the information in these Monikers to append the new data files to the Iceberg table.

The previous solution is entirely file-based. It does not rely on any external service. However, the file-based solution requires a lot of additional create/delete file operations and necessitates costly data lake list-files operation.

Here’s a high level diagram of our previous file-based solution.

All images unless otherwise noted are by the author

In this article we will attempt to improve on this solution by leveraging the Azure Event Grid service. The Event Grid is an event broker that you can use to implement an event-driven architecture. In an event-driven architecture, events are used to transmit state changes.

The state changes we are interested in transmitting are the file creation events when new files are added to the data lake by the Java Writers.

The consumer of these events is the Bookkeeper, which attaches the new data files to an Iceberg table.

Here’s a high level diagram of the queue-based solution.

All images unless otherwise noted are by the author

The Configuration

An Azure storage account consists of four services: Containers, File shares, Queues, and Tables. The two data storage services we are interested in are the Containers and the Queues. Containers are where the blob files are stored. Queues are where we will send file creation events.

Thus, we configure Event Grid to emit events every time a file is created inside a particular storage container. We configure Event Grid to publish these file creation events to a storage Queue.

Since we are only interested in file creation events, we configure Event Grid to send only the “Blob Created” events.

Also, we are only interested in files created under a certain path and with a certain file extension.

Finally, we can further narrow the events we will be processing by specifying advanced filters. When a Java Writer is done writing into a file, it invokes the REST call FlushWithClose. In the advanced filters, we specify data.api equals FlushWithClose.

We can target a particular Iceberg table or all Iceberg tables by specifying a subject which contains a substring. In our case we used /data/ which is the data folder used by all Iceberg tables.

When Java Writers create parquet files inside the /data/ folder, events are published to the storage Queue. We can use the Azure portal to monitor activity in the Event Grid. In particular, we can see that the delivered events are a subset of all the published events.

The Writers

The implementation of the Writers is essentially the same as in our previous file-based solution. In fact, the Writers are even a little simpler since they no longer need to publish notifications themselves. The Azure Event Grid takes care of publishing notifications. The Writers only write parquet files.

The Bookkeeper

The bookkeeper is still responsible for registering newly created data files to the Iceberg table. However, it now determines which files need to be appended by reading a queue of FlushWithClose events. We create a Java storage queue client like this

DataLakeTokenCredential token = new DataLakeTokenCredential();
String queueURL = Constants.hadoopConf.get("queue.location");
String queueName = Constants.hadoopConf.get("queue.name");
this.queueClient = new QueueClientBuilder()
.endpoint(queueURL)
.credential(token)
.queueName(queueName)
.buildClient();

Notice the use of our DataLakeTokenCredential class. Since the storage queue and the data lake (containers) reside within the same storage account, we leverage the same authentication mechanism for both.

Retrieving the events is as simple as calling receiveMessages.

List<QueueMessageItem> messages = queueClient.receiveMessages(MAX_MESSAGES,
Duration.ofSeconds(60), Duration.ofSeconds(10))
.collect(Collectors.toList());

Each of these messages will be a FlushWithClosewith the file path and file size. The messages are JSON that looks like this:

"data": {
"api": "FlushWithClose",
"contentType": "application/x-compressed",
"contentLength": 115739,
"blobType": "BlockBlob",
"blobUrl": "https://<accountname>.blob.core.windows.net/users/iceberg/schema/data/file.parquet",
"url": "https://<accountname>.blob.core.windows.net/users/iceberg/schema/data/file.parquet",
},

In our previous file-based solution, the Monikers (the messages) were written by the Java Writers and contained serialized Iceberg DataFile objects. Thus, the Bookkeeper could read these objects and directly do the following:

List<DataFile> dataFiles = readMonikers();
AppendFiles append = this.table.newAppend();
for (DataFile dataFile : dataFiles) {
append.appendFile(dataFile);
}
append.commit();

Now that we are using Azure Event Grid, we do not control the content of the messages. The Bookkeeper thus needs to create Iceberg DataFile objects using the file path and file size information provided by Event Grid.

We are missing the parquet metrics; fortunately, Iceberg has the necessary API to retrieve metrics from a parquet file.

InputFile in = this.table.io().newInputFile(f);
Metrics metrics = ParquetUtil.fileMetrics(in,
MetricsConfig.forTable(this.table));

Thanks to Samrose Ahmed for showing me this trick. Here’s his blog Serverless asynchronous Iceberg data ingestion, which uses the AWS SQS (the AWS equivalent to Azure Event Grid).

We now have all the information necessary to create an Iceberg DataFile object and commit it to an Iceberg table.

AppendFiles append = this.table.newAppend();
DataFile dataFile = DataFiles.builder(this.partitionSpec)
.withPath(filePath)
.withFileSizeInBytes(fileSize)
.withFormat("PARQUET")
.withMetrics(metrics)
.build();
append.appendFile(dataFile);
append.commit();

The Performance

When we profile this implementation, we quickly realize that most of the time is spent retrieving metrics from parquet files. Retrieving parquet file metrics is very efficient and only consists of reading the parquet file footer information. However, a REST call per file needs to be executed. The latencies of these REST calls can quickly add up. Fortunately, this can easily be remedied by parallelizing these requests using a Java executor service.

List<Callable<DataFile>> callableTasks = monikers.stream()
.map(m -> new MetricResolver(m, Constants.partitionSpec))
.collect(Collectors.toList());

List<Future<DataFile>> futures = executorService.invokeAll(callableTasks);

The Results

By leveraging an executor service for operations that are latent, we can greatly improve performance. Even when appending a large number of data files (hundreds), processing messages takes less than a second. Reading parquet metrics takes one to three seconds.

The only operation that cannot be optimized with an executor service is the Iceberg commit operation. Although, this operation is rather quick, taking between a second or two. On occasion, this operation can take up to 20 seconds when Iceberg reorganizes the manifests.

If we measure the delta between the time a data file is flushed and the time a data file is committed to the Iceberg table, we observe overall latencies as low as 3 seconds and up to 20 seconds.

The Conclusion

Using Azure Event Grid yields results similar to our previous file-based solution. However, the Event Grid solution yields more consistent results. We are not at the mercy of the file-listing data lake operation which can sometimes take a long time to execute, especially when the data lake is heavily used.

We also estimate the cost of “messaging” using Event Grid to be about 1/10th of “messaging” via a file-based solution. The storage cost of actual data files is the same in both solutions.

Leveraging Azure Event Grid is a great way to notify the Bookkeeper of newly created data files. However, if your storage appliance does not support a notification mechanism, using a file-based solution can also be viable.

If you are interested in evaluating these experiments for yourself, you can find the code used in these articles here.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – admin@technoblender.com. The content will be deleted within 24 hours.
azureCoteCreateEventGridIcebergJanjavaJeanClaudelatest newsLeveragingtableTech NewsTechnology
Comments (0)
Add Comment