Techno Blender
Digitally Yours.

Introducing Quix Streams: an open-source Python library for Kafka | by Tomáš Neubauer | Mar, 2023

0 43


Image by author

You might be wondering why the world needs another Python framework for Kafka. After all, there are a lot of existing libraries and frameworks to choose from, such as kafka-python, Faust, PySpark, and so on.

The focus of Quix Streams however is time-series and telemetry data, so the features are optimized for telemetry-related use cases. This could be device telemetry (it was originally road-testing on sensor data from Formula 1 racing cars) or other types of telemetry data such as metrics, logs, and traces.

It’s also designed to help you get the best out Apache Kafka’s horizontal scaling capabilities. This is especially important if you need to process a large firehose of data (e.g. 60,000 data points a second).

Nevertheless, you don’t have to be doing real-time ML on Formula 1 telemetry to find Quix Streams useful— my hope is that its simplicity and performance will make many of you more productive and I’m excited to see what other use cases you find for it.

To help you learn more about what you can do with this library, here’s a list of core features with simplified code samples to demonstrate how they work:

Use Pandas DataFrames to produce data more efficiently

Time-series parameters are emitted at the same time, so they share one timestamp. Handling this data independently is wasteful. The library uses a tabular system that can work for instance with Pandas DataFrames natively. Each row has a timestamp and user-defined tags as indexes

  • For a complete, runnable example of how to use the library with Pandas to stream data directly from a CSV, see this gist.

Produce time-series data without worrying about serialization or deserialization

Quix Streams serializes and deserializes time-series data using different codecs and optimizations to minimize payloads in order to increase throughput and reduce latency.

  • The following example shows data being appended to as stream with the add_value method:

Leverage built-in buffers to optimize processing operations for windows of time-series data

If you’re sending data at high frequency, processing each message can be costly. The library provides built-in time-series buffers for producing and consuming, allowing several configurations for balancing between latency and cost.

  • For example, you can configure the library to release a packet from the buffer whenever 100 items of timestamped data are collected or when a certain number of milliseconds in data have elapsed (using timestamps in the data rather the consumer machine’s clock).
buffer.packet_size = 100
buffer.time_span_in_milliseconds = 100
  • You can then read from the buffer and process it with the on_read function.

Produce and consume different types of mixed data

This library allows you to produce and consume different types of mixed data in the same timestamp, like numbers, strings or binary data.

  • For example, you can produce both time-series data and large binary blobs together.
  • Often, you’ll want to combine time series data with binary data. In the following example, we combine bus’s onboard camera with telemetry from its ECU unit so we can analyze the onboard camera feed with context.
  • You can also produce events that include payloads:
  • For example, you might need to listen for changes in time-series or binary streams and produce an event (such as “speed limit exceeded”). These might require some kind of document to send along with the event message (e.g. transaction invoices, or a speeding ticket with photographic proof). Here’s an example for a speeding camera:

Use stream contexts for horizontal scaling

Stream contexts allow you to bundle data from one data source into the same scope with supplementary metadata — which enables workloads to be horizontally scaled with multiple replicas.

  • In the following sample, the create_stream function is used to create a stream called bus-123AAAV which gets assigned to one particular consumer and will receive messages in the correct order:

Leverage built-in stateful processing for greater resiliency

The library includes an easy-to-use state store combining blob storage and Kubernetes persistence volumes that ensures quick recovery from any outages or disruptions.

To use it, you can create an instance of LocalFileStorage or use one of our helper classes to manage the state such as InMemoryStorage.

Here’s an example of a stateful operation sum for a selected column in data:

Other performance and usability enhancements

The library also includes a number of other enhancements that are designed to simplify the process of managing configuration and performance when interacting with Kafka:

  • No schema registry required: The library doesn’t need a schema registry to send different sets of types or parameters, this is handled internally by the protocol. This means that you can send more than one schema per topic.
  • Message splitting: Quix Streams automatically handles large messages on the producer side, splitting them up if required. You no longer need to worry about Kafka message limits. On the consumer side, those messages are automatically merged back.
  • Message Broker configuration: Many configuration settings are needed to use Kafka at its best, and the ideal configuration takes time. The library takes care of Kafka configuration by default but also supports custom configurations.
  • Checkpointing: The library supports manual or automatic checkpointing when you consume data from a Kafka Topic. This enables you to inform the message broker that you have already processed messages up to one point (and not process the same messages again in case of an unplanned restart).
  • Horizontal scaling: Quix Streams handles horizontal scaling using the streaming context feature. You can scale the processing services, from one replica to many and back to one, and the library ensures that the data load is always shared between your replicas reliably.

For a detailed overview of features, see the library documentation.

To quickly try out Quix Streams, you just need to install the library and set up a local Kafka instance.

Install Quix Streams

Install Quix streams with the following command:

python3 -m pip install quixstreams

Install Kafka locally

This library needs to utilize a message broker to send and receive data. To install and test Kafka locally:

  • Download the Apache Kafka binary from the Apache Kafka Download page.
  • Extract the contents of the file to a convenient location (i.e. kafka_dir), and start the Kafka services with the following commands:

Linux / macOS

<kafka_dir>/bin/zookeeper-server-start.sh config/zookeeper.properties 
<kafka_dir>/bin/zookeeper-server-start.sh config/server.properties

Windows

<kafka_dir>\bin\windows\zookeeper-server-start.bat.\config\zookeeper.properties 
<kafka_dir>\bin\windows\kafka-server-start.bat .\config\server.properties

The following examples will give you a basic idea of how to produce and consume data with Quix Streams.:

Here’s an example of how to produce time-series data into a Kafka Topic with Python.

import quixstreams as qx
import time
import datetime
import math

# Connect to your kafka client
client = qx.KafkaStreamingClient('127.0.0.1:9092')

# Open the output topic which is where data will be streamed out to
# If the topic does not exist, it will be created
topic_producer = client.get_topic_producer(topic_id_or_name = "mytesttopic")

# Set stream ID or leave parameters empty to get stream ID generated.
stream = topic_producer.create_stream()
stream.properties.name = "Hello World Python stream"

# Add metadata about time series data you are about to send.
stream.timeseries.add_definition("ParameterA").set_range(-1.2, 1.2)
stream.timeseries.buffer.time_span_in_milliseconds = 100

print("Sending values for 30 seconds.")

for index in range(0, 3000):
stream.timeseries \
.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("ParameterA", math.sin(index / 200.0) + math.sin(index) / 5.0) \
.publish()
time.sleep(0.01)

print("Closing stream")
stream.close()

Here’s an example of how to consume time-series data from a Kafka Topic with Python:

import quixstreams as qx
import pandas as pd

# Connect to your kafka client
client = qx.KafkaStreamingClient('127.0.0.1:9092')

# get the topic consumer for a specific consumer group
topic_consumer = client.get_topic_consumer(topic_id_or_name = "mytesttopic",
consumer_group = "empty-destination")

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
# do something with the data here
print(df)

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
# subscribe to new DataFrames being received
# if you aren't familiar with DataFrames there are other callbacks available
# refer to the docs here: https://docs.quix.io/sdk/subscribe.html
stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

# subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler
print("Listening to streams. Press CTRL-C to exit.")
# Handle termination signals and provide a graceful exit
qx.App.run()

For full documentation of how to consume and produce time-series and event data with Quix Streams, see the docs.

This is the first iteration of Quix Streams, and the next release is already in the works.

The main highlight is a new feature called “streaming data frames” that simplifies stateful stream processing for users coming from a batch processing environment. It eliminates the need for users to manage state in memory, update rolling windows, deal with checkpointing and state persistence, and manage state recovery after a service unexpectedly restarts.

By introducing a familiar interface to Pandas DataFrames, my collaborators and I hope to make stream processing even more accessible to data professionals who are new to streaming data.

The following example shows how you would perform rolling window calculation on a streaming data frame:

Note that this is exactly how you would do the same calculation on static data in Jupyter notebook — so will be easy to learn for those of you who are used to batch processing.

There’s also no need to grapple with the complexity of stateful processing on streaming data — this will all be managed by the library. Moreover, although it will still feel like Pandas, it will use binary tables under the hood — which adds a significant performance boost compared to traditional Pandas DataFrames.

To find out when the next version is ready, make sure you watch the Quix Streams GitHub repo.

The roadmap should also be shaped by feedback and contributions from the wider data community:

  • If you find a bug or want to request an enhancement, feel free to log a GitHub issue.
  • If you have questions, need help, or simply want to find out more about the library, try posting a message in the Slack community “The Stream” (which I help to moderate) or check out the documentation.
  • If you want to improve the library, see the contribution guidelines.


Image by author

You might be wondering why the world needs another Python framework for Kafka. After all, there are a lot of existing libraries and frameworks to choose from, such as kafka-python, Faust, PySpark, and so on.

The focus of Quix Streams however is time-series and telemetry data, so the features are optimized for telemetry-related use cases. This could be device telemetry (it was originally road-testing on sensor data from Formula 1 racing cars) or other types of telemetry data such as metrics, logs, and traces.

It’s also designed to help you get the best out Apache Kafka’s horizontal scaling capabilities. This is especially important if you need to process a large firehose of data (e.g. 60,000 data points a second).

Nevertheless, you don’t have to be doing real-time ML on Formula 1 telemetry to find Quix Streams useful— my hope is that its simplicity and performance will make many of you more productive and I’m excited to see what other use cases you find for it.

To help you learn more about what you can do with this library, here’s a list of core features with simplified code samples to demonstrate how they work:

Use Pandas DataFrames to produce data more efficiently

Time-series parameters are emitted at the same time, so they share one timestamp. Handling this data independently is wasteful. The library uses a tabular system that can work for instance with Pandas DataFrames natively. Each row has a timestamp and user-defined tags as indexes

  • For a complete, runnable example of how to use the library with Pandas to stream data directly from a CSV, see this gist.

Produce time-series data without worrying about serialization or deserialization

Quix Streams serializes and deserializes time-series data using different codecs and optimizations to minimize payloads in order to increase throughput and reduce latency.

  • The following example shows data being appended to as stream with the add_value method:

Leverage built-in buffers to optimize processing operations for windows of time-series data

If you’re sending data at high frequency, processing each message can be costly. The library provides built-in time-series buffers for producing and consuming, allowing several configurations for balancing between latency and cost.

  • For example, you can configure the library to release a packet from the buffer whenever 100 items of timestamped data are collected or when a certain number of milliseconds in data have elapsed (using timestamps in the data rather the consumer machine’s clock).
buffer.packet_size = 100
buffer.time_span_in_milliseconds = 100
  • You can then read from the buffer and process it with the on_read function.

Produce and consume different types of mixed data

This library allows you to produce and consume different types of mixed data in the same timestamp, like numbers, strings or binary data.

  • For example, you can produce both time-series data and large binary blobs together.
  • Often, you’ll want to combine time series data with binary data. In the following example, we combine bus’s onboard camera with telemetry from its ECU unit so we can analyze the onboard camera feed with context.
  • You can also produce events that include payloads:
  • For example, you might need to listen for changes in time-series or binary streams and produce an event (such as “speed limit exceeded”). These might require some kind of document to send along with the event message (e.g. transaction invoices, or a speeding ticket with photographic proof). Here’s an example for a speeding camera:

Use stream contexts for horizontal scaling

Stream contexts allow you to bundle data from one data source into the same scope with supplementary metadata — which enables workloads to be horizontally scaled with multiple replicas.

  • In the following sample, the create_stream function is used to create a stream called bus-123AAAV which gets assigned to one particular consumer and will receive messages in the correct order:

Leverage built-in stateful processing for greater resiliency

The library includes an easy-to-use state store combining blob storage and Kubernetes persistence volumes that ensures quick recovery from any outages or disruptions.

To use it, you can create an instance of LocalFileStorage or use one of our helper classes to manage the state such as InMemoryStorage.

Here’s an example of a stateful operation sum for a selected column in data:

Other performance and usability enhancements

The library also includes a number of other enhancements that are designed to simplify the process of managing configuration and performance when interacting with Kafka:

  • No schema registry required: The library doesn’t need a schema registry to send different sets of types or parameters, this is handled internally by the protocol. This means that you can send more than one schema per topic.
  • Message splitting: Quix Streams automatically handles large messages on the producer side, splitting them up if required. You no longer need to worry about Kafka message limits. On the consumer side, those messages are automatically merged back.
  • Message Broker configuration: Many configuration settings are needed to use Kafka at its best, and the ideal configuration takes time. The library takes care of Kafka configuration by default but also supports custom configurations.
  • Checkpointing: The library supports manual or automatic checkpointing when you consume data from a Kafka Topic. This enables you to inform the message broker that you have already processed messages up to one point (and not process the same messages again in case of an unplanned restart).
  • Horizontal scaling: Quix Streams handles horizontal scaling using the streaming context feature. You can scale the processing services, from one replica to many and back to one, and the library ensures that the data load is always shared between your replicas reliably.

For a detailed overview of features, see the library documentation.

To quickly try out Quix Streams, you just need to install the library and set up a local Kafka instance.

Install Quix Streams

Install Quix streams with the following command:

python3 -m pip install quixstreams

Install Kafka locally

This library needs to utilize a message broker to send and receive data. To install and test Kafka locally:

  • Download the Apache Kafka binary from the Apache Kafka Download page.
  • Extract the contents of the file to a convenient location (i.e. kafka_dir), and start the Kafka services with the following commands:

Linux / macOS

<kafka_dir>/bin/zookeeper-server-start.sh config/zookeeper.properties 
<kafka_dir>/bin/zookeeper-server-start.sh config/server.properties

Windows

<kafka_dir>\bin\windows\zookeeper-server-start.bat.\config\zookeeper.properties 
<kafka_dir>\bin\windows\kafka-server-start.bat .\config\server.properties

The following examples will give you a basic idea of how to produce and consume data with Quix Streams.:

Here’s an example of how to produce time-series data into a Kafka Topic with Python.

import quixstreams as qx
import time
import datetime
import math

# Connect to your kafka client
client = qx.KafkaStreamingClient('127.0.0.1:9092')

# Open the output topic which is where data will be streamed out to
# If the topic does not exist, it will be created
topic_producer = client.get_topic_producer(topic_id_or_name = "mytesttopic")

# Set stream ID or leave parameters empty to get stream ID generated.
stream = topic_producer.create_stream()
stream.properties.name = "Hello World Python stream"

# Add metadata about time series data you are about to send.
stream.timeseries.add_definition("ParameterA").set_range(-1.2, 1.2)
stream.timeseries.buffer.time_span_in_milliseconds = 100

print("Sending values for 30 seconds.")

for index in range(0, 3000):
stream.timeseries \
.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("ParameterA", math.sin(index / 200.0) + math.sin(index) / 5.0) \
.publish()
time.sleep(0.01)

print("Closing stream")
stream.close()

Here’s an example of how to consume time-series data from a Kafka Topic with Python:

import quixstreams as qx
import pandas as pd

# Connect to your kafka client
client = qx.KafkaStreamingClient('127.0.0.1:9092')

# get the topic consumer for a specific consumer group
topic_consumer = client.get_topic_consumer(topic_id_or_name = "mytesttopic",
consumer_group = "empty-destination")

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
# do something with the data here
print(df)

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
# subscribe to new DataFrames being received
# if you aren't familiar with DataFrames there are other callbacks available
# refer to the docs here: https://docs.quix.io/sdk/subscribe.html
stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

# subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler
print("Listening to streams. Press CTRL-C to exit.")
# Handle termination signals and provide a graceful exit
qx.App.run()

For full documentation of how to consume and produce time-series and event data with Quix Streams, see the docs.

This is the first iteration of Quix Streams, and the next release is already in the works.

The main highlight is a new feature called “streaming data frames” that simplifies stateful stream processing for users coming from a batch processing environment. It eliminates the need for users to manage state in memory, update rolling windows, deal with checkpointing and state persistence, and manage state recovery after a service unexpectedly restarts.

By introducing a familiar interface to Pandas DataFrames, my collaborators and I hope to make stream processing even more accessible to data professionals who are new to streaming data.

The following example shows how you would perform rolling window calculation on a streaming data frame:

Note that this is exactly how you would do the same calculation on static data in Jupyter notebook — so will be easy to learn for those of you who are used to batch processing.

There’s also no need to grapple with the complexity of stateful processing on streaming data — this will all be managed by the library. Moreover, although it will still feel like Pandas, it will use binary tables under the hood — which adds a significant performance boost compared to traditional Pandas DataFrames.

To find out when the next version is ready, make sure you watch the Quix Streams GitHub repo.

The roadmap should also be shaped by feedback and contributions from the wider data community:

  • If you find a bug or want to request an enhancement, feel free to log a GitHub issue.
  • If you have questions, need help, or simply want to find out more about the library, try posting a message in the Slack community “The Stream” (which I help to moderate) or check out the documentation.
  • If you want to improve the library, see the contribution guidelines.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment