Techno Blender
Digitally Yours.

Anomaly Detection using Sigma Rules (Part 2) Spark Stream-Stream Join | by Jean-Claude Cote | Feb, 2023

0 55


Photo by Naveen Kumar on Unsplash

Following up on our previous article, we evaluate Sparks ability to join a start-process event with it’s parent start-process event.

In this article, we evaluated how Spark stream-stream join can scale. Specifically, how many events can it hold in in the join window.

During our research, we evaluated a few approaches:

Full join

Doing a full stream-stream join requires caching all previous events on the right side of the join (parent start-process). All the past parent start-process details are not necessary since only a subset of these parent start-process events are of interest. For example, a Sigma rule might specify a parent CommandLine containing the string .cpl— all other events can be ignored.

Join with Parent of Interest

The parents of interest is the result of applying filter conditions to the right side of the join. This can greatly reduce the number of parents to remember. After the join is performed, we apply the conditions on the current process and on the parent process.

Join with Features of Parent of Interest

A better solution is to store the conditions we evaluate on the right side and discard all other attributes — CommandLine, Image etc. This way we only keep a limited number of boolean flags rather than potentially long strings. In the diagram below, Features is a map of Sigma filter expression name and the value is the result of the test. For example:

features = { 
'rule3_selection_atexec' -> false,
'rule2_selection' -> true
}

During our research, we quickly came to the realization that reducing the amount of state Spark needs to store is paramount. Thus we chose to keep only parents of interest. These are the parents which have a feature we are looking for. We discard all other parents and keep only the minimal set of information about these parents: join key, timestamp and feature flags.

Simulation Test Harness

In order to evaluate the performance of the Spark stream-stream join, we created a mock stream of cause and effect events. In our experiments, we disabled the late arrival support by setting a watermark of zero.

    cause = (
spark.readStream
.format("rate")
.option("rowsPerSecond", rate)
.load()
.withWatermark("timestamp", "0 seconds")
.withColumn("name", F.lit(name))
)

These cause and effect streaming events are joined using spark stream-stream:

    
cause = cause.select('cause_timestamp', 'cause_key', 'cause_load')

effect = effect.select(
'effect_timestamp',
'effect_key',
'effect_load',
'host_id',
'id',
'name',
'value')

joindf = (
effect.join(
cause,
F.expr(f"""
effect_key = cause_key
and effect_timestamp >= cause_timestamp
and effect_timestamp < (cause_timestamp + interval {window} seconds)
"""),
"left"
)

joindf
.writeStream
.format("iceberg")
.outputMode("append")
.trigger(processingTime="1 minutes")
.queryName(name)
.option("checkpointLocation", checkpoint)
.toTable(output_table_name)

Notice the join expression effect_key = cause_key and the windowing clauses stating that an effect time must be after the cause but not further back in time than window seconds.

Linxiao Ma nicely explains in his article, Spark Structured Streaming Deep Dive (7) — Stream-Stream Join, that under these conditions, cause events will get cached in Spark’s stateful state store up to the window seconds. However, the effect events are not required to be stored. For every effect event passing in the stream-stream join, a lookup is made to find a corresponding cause. For every effect entering the join, a cause+effect row is written out.

Choosing the Right State Store

Spark has two state store implementations. The original is named HDFSBackedStateStore and is a simple in-memory hash map backed by HDFS files. The newest state store is based on RocksDB. RocksDB is an embeddable key-value persistent store written in C++. The state of RocksDB is kept partly in memory and partly on local disk. At every checkpoint, Spark saves a copy of the changed files to a central location (datalake).

Spark recommends RocksDB when you have a large number of keys to store. According to DataBricks, a large Spark worker node can cache up to 100 million keys.

Since our stream-stream join will cache a lot of parents of interest rows, we decided to use the RocksDB state store in our evaluation.

.config("spark.sql.streaming.stateStore.providerClass", 
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

All of our experiments are performed on a single Spark worker with 48G of RAM and 16 CPU. We simulate logs coming from 50,000 hosts.

Our test harness is very flexible. It lets us alter many parameters, such as the events per seconds, size of each event, the time window, for the join, key sizes, distribution of events in the time window etc.

Effect of Spark Partitions

In our first experiment, we joined effects and causes over a window of 10,000 seconds (~2.77 hours). We simulated that each parent of interest would have 12 boolean flags. We set an event rate of 10,000 per second. Here, we show the effect of varying the number of Spark partitions (individual tasks).

Varying the number of partitions has no effect on performance. The time to execute a micro-batch is about 225 seconds. Remember we triggered at every 60 seconds .trigger(processingTime="1 minutes") . Spark will start the next micro-batch immediately. The event processing latency is thus a maximum of 225 seconds.

Effect of Window Size

In this second experiment, we varied the size (time) of the stream-stream join window. The job is not stable at a rate of 5,000 events per seconds. Each micro-batch takes longer and longer to execute. We are falling behind.

If we reduce the window to 18 hours and reduce the rate to 2,500 events per second, the job stabilizes and settles at about 300 seconds per micro-batch.

However, in practice, we will not keep every parent event. We will only keep the “parents of interests”. These are the events which have one or more Sigma rule expressions that is true. What is important to measure is Spark’s ability to hold parent events. We can easily calculate this: 2,500 event/seconds x 64,000 seconds. Spark can cache 160 million “parents of interests”. Our experimental result confirms Databricks claim that the RocksDB StateStore can handle 100 million keys per machine. If we suppose these events come from 50,000 hosts, Spark can hold 3,200 “parents of interests” per host.

It’s interesting to observe that Spark stores the feature flags and the keys. It needs to store the key of the causes in order to join them to the effect key.

The other interesting observation we can make is, what do we retrieve from this lookup by key? We retrieve an event (a row) that contains boolean flags. In practice, these flags are often mutually exclusive. That is, in a given row, only one of them might be true, while all others are false. Spark is storing the cause key and all the associated flags no matter if they are true or false.

Bloom Filter Join

Is there a better way to keep track of feature flags? Yes, the answer is a bloom filter.

A bloom filter is a probabilistic data structure which can store a key and test for the presence of a key. Bloom filters hash the key and use the result of the hash to set a few bits in a bit array.

Bloom filters are extremely compact. The price you pay for the space saving is of possible false positives. However, once a detection is made, the Sigma rule that triggered can be re-evaluated to confirm correctness.

We can use a bloom filter to perform the join from above. Let’s suppose we use a composite key (parent_key + feature_id), where the feature_id is the name given to a Sigma filter expression. Filter expression that apply to a parent process are stored in the bloom, but only if they are true. Testing for the presence of the composite key returns true if the key is in the bloom and false if it is not.

A bloom can hold a certain amount of keys. Past that number, false positives increase drastically. By only storing features that are true, we prolong the usefulness of the bloom filter.

The join is thus modeled as a lookup in a bloom filter.

In our next article, we will build a custom Spark stateful join function that leverages a bloom filter.

All images unless otherwise noted are by the author


Photo by Naveen Kumar on Unsplash

Following up on our previous article, we evaluate Sparks ability to join a start-process event with it’s parent start-process event.

In this article, we evaluated how Spark stream-stream join can scale. Specifically, how many events can it hold in in the join window.

During our research, we evaluated a few approaches:

Full join

Doing a full stream-stream join requires caching all previous events on the right side of the join (parent start-process). All the past parent start-process details are not necessary since only a subset of these parent start-process events are of interest. For example, a Sigma rule might specify a parent CommandLine containing the string .cpl— all other events can be ignored.

Join with Parent of Interest

The parents of interest is the result of applying filter conditions to the right side of the join. This can greatly reduce the number of parents to remember. After the join is performed, we apply the conditions on the current process and on the parent process.

Join with Features of Parent of Interest

A better solution is to store the conditions we evaluate on the right side and discard all other attributes — CommandLine, Image etc. This way we only keep a limited number of boolean flags rather than potentially long strings. In the diagram below, Features is a map of Sigma filter expression name and the value is the result of the test. For example:

features = { 
'rule3_selection_atexec' -> false,
'rule2_selection' -> true
}

During our research, we quickly came to the realization that reducing the amount of state Spark needs to store is paramount. Thus we chose to keep only parents of interest. These are the parents which have a feature we are looking for. We discard all other parents and keep only the minimal set of information about these parents: join key, timestamp and feature flags.

Simulation Test Harness

In order to evaluate the performance of the Spark stream-stream join, we created a mock stream of cause and effect events. In our experiments, we disabled the late arrival support by setting a watermark of zero.

    cause = (
spark.readStream
.format("rate")
.option("rowsPerSecond", rate)
.load()
.withWatermark("timestamp", "0 seconds")
.withColumn("name", F.lit(name))
)

These cause and effect streaming events are joined using spark stream-stream:

    
cause = cause.select('cause_timestamp', 'cause_key', 'cause_load')

effect = effect.select(
'effect_timestamp',
'effect_key',
'effect_load',
'host_id',
'id',
'name',
'value')

joindf = (
effect.join(
cause,
F.expr(f"""
effect_key = cause_key
and effect_timestamp >= cause_timestamp
and effect_timestamp < (cause_timestamp + interval {window} seconds)
"""),
"left"
)

joindf
.writeStream
.format("iceberg")
.outputMode("append")
.trigger(processingTime="1 minutes")
.queryName(name)
.option("checkpointLocation", checkpoint)
.toTable(output_table_name)

Notice the join expression effect_key = cause_key and the windowing clauses stating that an effect time must be after the cause but not further back in time than window seconds.

Linxiao Ma nicely explains in his article, Spark Structured Streaming Deep Dive (7) — Stream-Stream Join, that under these conditions, cause events will get cached in Spark’s stateful state store up to the window seconds. However, the effect events are not required to be stored. For every effect event passing in the stream-stream join, a lookup is made to find a corresponding cause. For every effect entering the join, a cause+effect row is written out.

Choosing the Right State Store

Spark has two state store implementations. The original is named HDFSBackedStateStore and is a simple in-memory hash map backed by HDFS files. The newest state store is based on RocksDB. RocksDB is an embeddable key-value persistent store written in C++. The state of RocksDB is kept partly in memory and partly on local disk. At every checkpoint, Spark saves a copy of the changed files to a central location (datalake).

Spark recommends RocksDB when you have a large number of keys to store. According to DataBricks, a large Spark worker node can cache up to 100 million keys.

Since our stream-stream join will cache a lot of parents of interest rows, we decided to use the RocksDB state store in our evaluation.

.config("spark.sql.streaming.stateStore.providerClass", 
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

All of our experiments are performed on a single Spark worker with 48G of RAM and 16 CPU. We simulate logs coming from 50,000 hosts.

Our test harness is very flexible. It lets us alter many parameters, such as the events per seconds, size of each event, the time window, for the join, key sizes, distribution of events in the time window etc.

Effect of Spark Partitions

In our first experiment, we joined effects and causes over a window of 10,000 seconds (~2.77 hours). We simulated that each parent of interest would have 12 boolean flags. We set an event rate of 10,000 per second. Here, we show the effect of varying the number of Spark partitions (individual tasks).

Varying the number of partitions has no effect on performance. The time to execute a micro-batch is about 225 seconds. Remember we triggered at every 60 seconds .trigger(processingTime="1 minutes") . Spark will start the next micro-batch immediately. The event processing latency is thus a maximum of 225 seconds.

Effect of Window Size

In this second experiment, we varied the size (time) of the stream-stream join window. The job is not stable at a rate of 5,000 events per seconds. Each micro-batch takes longer and longer to execute. We are falling behind.

If we reduce the window to 18 hours and reduce the rate to 2,500 events per second, the job stabilizes and settles at about 300 seconds per micro-batch.

However, in practice, we will not keep every parent event. We will only keep the “parents of interests”. These are the events which have one or more Sigma rule expressions that is true. What is important to measure is Spark’s ability to hold parent events. We can easily calculate this: 2,500 event/seconds x 64,000 seconds. Spark can cache 160 million “parents of interests”. Our experimental result confirms Databricks claim that the RocksDB StateStore can handle 100 million keys per machine. If we suppose these events come from 50,000 hosts, Spark can hold 3,200 “parents of interests” per host.

It’s interesting to observe that Spark stores the feature flags and the keys. It needs to store the key of the causes in order to join them to the effect key.

The other interesting observation we can make is, what do we retrieve from this lookup by key? We retrieve an event (a row) that contains boolean flags. In practice, these flags are often mutually exclusive. That is, in a given row, only one of them might be true, while all others are false. Spark is storing the cause key and all the associated flags no matter if they are true or false.

Bloom Filter Join

Is there a better way to keep track of feature flags? Yes, the answer is a bloom filter.

A bloom filter is a probabilistic data structure which can store a key and test for the presence of a key. Bloom filters hash the key and use the result of the hash to set a few bits in a bit array.

Bloom filters are extremely compact. The price you pay for the space saving is of possible false positives. However, once a detection is made, the Sigma rule that triggered can be re-evaluated to confirm correctness.

We can use a bloom filter to perform the join from above. Let’s suppose we use a composite key (parent_key + feature_id), where the feature_id is the name given to a Sigma filter expression. Filter expression that apply to a parent process are stored in the bloom, but only if they are true. Testing for the presence of the composite key returns true if the key is in the bloom and false if it is not.

A bloom can hold a certain amount of keys. Past that number, false positives increase drastically. By only storing features that are true, we prolong the usefulness of the bloom filter.

The join is thus modeled as a lookup in a bloom filter.

In our next article, we will build a custom Spark stateful join function that leverages a bloom filter.

All images unless otherwise noted are by the author

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.
Leave a comment