Techno Blender
Digitally Yours.

Anomaly Detection using Sigma Rules (Part 5) Flux Capacitor Optimization | by Jean-Claude Cote | Mar, 2023

0 67


Photo by Leora Winter on Unsplash, Shippagan, NB, Canada

This is the 5th article of our series. Refer to part 1 , part 2, part 3 and part 4 for some context.

In our previous articles, we have demonstrated the performance gains achieved by using a bloom filter. We also showed how we leveraged a bloom filter to implement temporal proximity correlations, parent/child and ancestor relationships.

So far we have been using a single bloom per host. Eventually the bloom filter will be saturated with tags and will issue a lot of false positive. Using this online bloom filter calculator, we can see the probability of getting a false positive. Notice how the false positive rate quickly increases passed 200,000 tags. (This graph is for n=200,000 and p=1%)

Image by Author

Forgetful Bloom Filter

What we need is a way to age off very old tags. We need a forgetful bloom filter. As explained in this excellent paper from Redis Labs Age-Partitioned Bloom Filter, there are many ways to achieve a forgetful bloom filter. We will use the most basic approach:

Segmentation based approaches use several disjoint segments which can be individually added and retired. The most naïf and several times mentioned approach is a sequence of plain BFs, one per generation, adding a new one and retiring the oldest when the one in use gets full.

We chose to use 10 generations. Thus we use 10 bloom filter per host. Each bloom is capable of holding up to 20,000 tags.

We use an “active” bloom to insert new tags. When the “active” bloom is full, we create a new one. When we reach 10 blooms, we discard the oldest bloom.

We query for tags by testing the “active” bloom. If the tag is not found we test the next (older) bloom until we reach the end.

Notice that for every tag we want to test, we can potentially perform 10 tests in 10 different blooms. Each tests has a certain probability of reporting a false positive. So by using 10 blooms, we increase our chances by 10. To reduce the chances of getting false positive, we use blooms with a ffp of 1/1000 rather than 1/100. In fact, we will show we can even use ffp of 1/10000

In order to accommodate multiple blooms, we will no longer store a bloom object in the state store:

val stateEncoder = Encoders.javaSerialization(BloomFilter.class)

Rather, we will persists an FluxState object holding a list of bloom filters:

val stateEncoder = Encoders.product[FluxState]

The FluxState has the following fields:

case class FluxState(
var version: Long = 0,
var active: Int = 0,
var serializedBlooms: List[Array[Byte]] = List()
) extends TagCache {

For performance reasons, we serialize the bloom filters ourselves. Since we know the size of these objects, we can optimize the serialization by pre-allocating the serialization buffers. The serializedBlooms field holds the serialized blooms. The active field keeps track of the index of the active bloom within this list. We will explain the use of the version number a bit later. This is how we serialize the blooms:

val padding = 4000
val n = tagCapacity / NUM_BLOOMS
// Formula taken from https://hur.st/bloomfilter
// m = ceil( (n * log(p)) / log(1 / pow(2, log(2))))
val mBits = Math.ceil(
(n * Math.log(desiredFpp)) / Math.log(1 / Math.pow(2, Math.log(2))))
val numBytes = (mBits / 8).toInt + padding
val byteArrayOut = new ByteArrayOutputStream(numBytes)
val store = new ObjectOutputStream(byteArrayOut)
store.writeObject(bloom)
store.close
byteArrayOut.toByteArray()

Efficient Checkpointing

We segmented our large bloom into 10 smaller ones. Due to the nature of bloom filters, the space used by 10 blooms of 20,000 tags is roughly the same as a larger 200,000 tag bloom, roughly 200KiB.

The Spark HDFS state store provider keeps all the FluxState objects in memory. If we suppose a fleet of 50,000 hosts, this results in about 10GiB of RAM. In fact, the memory usage of the HDFS state store is measured to be 25GiB.

Image by Author

The reason why it’s much higher is that the HDFS state store keeps 2 copies of the states by default. We can changed it to store a single copy using spark.sql.streaming.maxBatchesToRetainInMemory. This brings down memory usage to about 12GiB of RAM, which corresponds to our estimate.

As part of checkpointing, Spark writes out all the states to the data lake and it does this after every micro-batch completes. Spark spends a lot of time persisting 12 GiB of state and does so over and over.

However, during every micro-batch, we only modify 1 out of 10 blooms (the active bloom). The other 9 blooms might be queried but remain unchanged. The default HDFS state store provider is unaware of which bloom is changed, it simply persists the FluxState object. If the state store provider knew which bloom is the active bloom, it could be more efficient and only checkpoint the modified active bloom. It could potentially cut down serialization to 1/10th of the 12GiB.

Custom State Store Provider

The HDFSBackedStateStoreProvider.scala class handles the put and get state requests. It holds these key/value pairs in an in-memory map and persists these key/value to the datalake.

With some slight modifications to the put and get methods, we can specialize the behavior of the HDFSBackedStateStoreProvider class and make it aware of our forgetful bloom filter strategy.

The basic idea is to store each bloom segment under a seperate key. Instead of storing the entire state under the key “windows_host_abc”, we will store each segment under “windows_host_abc_segment1”, “windows_host_abc_segment2”, “windows_host_abc_segment3”, etc.

The put function receives the key and value to store. The key will be a host_id and the value will be a FluxState object. Spark encodes both the key and value as an UnsafeRow before calling this method:

override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
require(value != null, "Cannot put a null value")
verify(state == UPDATING, "Cannot put after already committed or aborted")
//val keyCopy = key.copy()
val valueCopy = value.copy()

// Create the key to store this bloom using the bloom id (active)
val newRowKey = makeRowKey(key.getString(0), getActiveValue(valueCopy))
// Store key/value as normal
mapToUpdate.put(newRowKey, valueCopy)
writeUpdateToDeltaFile(compressedStream, newRowKey, valueCopy)
}

Our put function is exactly the same as the original. The only change we make is to the key. We append the “active” bloom index to the original key.

It’s important to note that we also modified our FluxState class to serialize only the “active” bloom, rather than all 10 blooms.

def toState(): FluxState = {
version += 1
// we only store the active blooms, all other blooms were un-changed
val approxsize = tagCapacity / NUM_BLOOMS * 3
val byteArrayOut = new ByteArrayOutputStream(approxsize)
val store = new ObjectOutputStream(byteArrayOut)
store.writeObject(getActiveBloom())
store.close
serializedBlooms = List(byteArrayOut.toByteArray())
}

The original get method was like this:

override def get(key: UnsafeRow): UnsafeRow = {
mapToUpdate.get(key)
}

We modified the get method to gather the 10 blooms segments of a host_id. First, we build a list of FluxStates by iterating over the 10 bloom indexes. Then, we create a new FluxState which holds all the blooms. We determine which one is the active bloom using the version number.

override def get(key: UnsafeRow): UnsafeRow = {
// CCCS: The list of blooms for this key are put
// in separate entries in the store
// we will find all of these entries and create a FluxState
val groupKey = key.getString(0)
val fluxStates = Range(0, NUM_BLOOMS)
.map(bloomIndex => mapToUpdate.get(makeRowKey(groupKey, bloomIndex)))
.filter(_ != null)
.map(value => deserializeFluxState(value))

if(fluxStates.length > 0) {
makeRowValue(coalesceFluxStates(fluxStates))
}
else {
// else we found none of the blooms
null
}
}

Adjusting False Positive Probability

Now that we have segmented the bloom in 10 parts, we can potentially query 10 blooms and thus have more chances of false positives. To remedy this, we’ve reduce the fpp to 1/10000 which results in an overall fpp of 1/1000. This is ten times less chances then in our previous experiments. Nonetheless, because we only serialize the “active” bloom, the overall performance is much better.

Results

Previously, when we were serializing all segments, we could reach up to a capacity of 100,000 tags per host at a rate of 5,000 events per second.

Using a segmented approach where we only serialize the active bloom, we can achieve a capacity of 300,000 tags per host at a rate of 5,000 events per second. Alternatively, we can reduce the size of the bloom to accommodate more events per seconds: 200,000 tag @ 8,000 events per second

Image by Author

In all previous experiments, we have been “seeding” the newly created bloom filters with random tags. We did this to prevent the HDFSBackStore from compressing the bloom filters while it saves their state to the data lake. An empty bloom compresses to practically zero and a bloom at capacity (with maximum entropy) is pretty much incompressible. When we first launched the experiment, performance was amazing due to incredible compression. It took a very long time to see the effect of the tags in the blooms. To fix this, we seeded all blooms at 95% capacity. In other words, we have been measuring the worst case scenario.

However, in practice, blooms will fill up slowly. Some blooms will fill up faster than others. Statistically, we will never have all 50,000 blooms filled at 95% capacity at the exact same time. A more realistic simulation can be performed using random seeding.

def createBloom() = {
val bloomCapacity: Int = tagCapacity / NUM_BLOOMS
val bloom = BloomFilter.create(
Funnels.stringFunnel(),
bloomCapacity,
desiredFpp)
val prep = (bloomCapacity * Math.random()).toInt
(1 to prep).map("padding" + _).foreach(s => bloom.put(s))
bloom
}

Benefiting from compressible blooms, we can run this simulation with a rate of 10,000 events per seconds and a overall capacity of 400,000 tags per host for a total of 20 billion tags on a single Spark worker. This is way more than the 100 million tags we could achieve with stream-stream join.

Storing and retrieving tags from the blooms is extremely fast. On average, a single machine can perform about 200,000 tests a second. Storing tags in the bloom is a bit more costly, but a machine can still store 20,000 tags a second. This means we can support a lot of Sigma rules simultaneously.

Here’s a summary of the different strategies used in our experiments.

Image by Author

Conclusion

The results clearly show the performance improvements of the bloom strategy coupled with a custom state store which only saves the “active” bloom segments. The bloom strategy is also more generic than the stream-stream join approach since it can handle use cases such as “ancestors” and temporal proximity (ordered and un-ordered). The proof of concept can be found here https://github.com/cccs-jc/flux-capacitor. If you have novel use cases for this flux-capacitor function we would love to hear about it.


Photo by Leora Winter on Unsplash, Shippagan, NB, Canada

This is the 5th article of our series. Refer to part 1 , part 2, part 3 and part 4 for some context.

In our previous articles, we have demonstrated the performance gains achieved by using a bloom filter. We also showed how we leveraged a bloom filter to implement temporal proximity correlations, parent/child and ancestor relationships.

So far we have been using a single bloom per host. Eventually the bloom filter will be saturated with tags and will issue a lot of false positive. Using this online bloom filter calculator, we can see the probability of getting a false positive. Notice how the false positive rate quickly increases passed 200,000 tags. (This graph is for n=200,000 and p=1%)

Image by Author

Forgetful Bloom Filter

What we need is a way to age off very old tags. We need a forgetful bloom filter. As explained in this excellent paper from Redis Labs Age-Partitioned Bloom Filter, there are many ways to achieve a forgetful bloom filter. We will use the most basic approach:

Segmentation based approaches use several disjoint segments which can be individually added and retired. The most naïf and several times mentioned approach is a sequence of plain BFs, one per generation, adding a new one and retiring the oldest when the one in use gets full.

We chose to use 10 generations. Thus we use 10 bloom filter per host. Each bloom is capable of holding up to 20,000 tags.

We use an “active” bloom to insert new tags. When the “active” bloom is full, we create a new one. When we reach 10 blooms, we discard the oldest bloom.

We query for tags by testing the “active” bloom. If the tag is not found we test the next (older) bloom until we reach the end.

Notice that for every tag we want to test, we can potentially perform 10 tests in 10 different blooms. Each tests has a certain probability of reporting a false positive. So by using 10 blooms, we increase our chances by 10. To reduce the chances of getting false positive, we use blooms with a ffp of 1/1000 rather than 1/100. In fact, we will show we can even use ffp of 1/10000

In order to accommodate multiple blooms, we will no longer store a bloom object in the state store:

val stateEncoder = Encoders.javaSerialization(BloomFilter.class)

Rather, we will persists an FluxState object holding a list of bloom filters:

val stateEncoder = Encoders.product[FluxState]

The FluxState has the following fields:

case class FluxState(
var version: Long = 0,
var active: Int = 0,
var serializedBlooms: List[Array[Byte]] = List()
) extends TagCache {

For performance reasons, we serialize the bloom filters ourselves. Since we know the size of these objects, we can optimize the serialization by pre-allocating the serialization buffers. The serializedBlooms field holds the serialized blooms. The active field keeps track of the index of the active bloom within this list. We will explain the use of the version number a bit later. This is how we serialize the blooms:

val padding = 4000
val n = tagCapacity / NUM_BLOOMS
// Formula taken from https://hur.st/bloomfilter
// m = ceil( (n * log(p)) / log(1 / pow(2, log(2))))
val mBits = Math.ceil(
(n * Math.log(desiredFpp)) / Math.log(1 / Math.pow(2, Math.log(2))))
val numBytes = (mBits / 8).toInt + padding
val byteArrayOut = new ByteArrayOutputStream(numBytes)
val store = new ObjectOutputStream(byteArrayOut)
store.writeObject(bloom)
store.close
byteArrayOut.toByteArray()

Efficient Checkpointing

We segmented our large bloom into 10 smaller ones. Due to the nature of bloom filters, the space used by 10 blooms of 20,000 tags is roughly the same as a larger 200,000 tag bloom, roughly 200KiB.

The Spark HDFS state store provider keeps all the FluxState objects in memory. If we suppose a fleet of 50,000 hosts, this results in about 10GiB of RAM. In fact, the memory usage of the HDFS state store is measured to be 25GiB.

Image by Author

The reason why it’s much higher is that the HDFS state store keeps 2 copies of the states by default. We can changed it to store a single copy using spark.sql.streaming.maxBatchesToRetainInMemory. This brings down memory usage to about 12GiB of RAM, which corresponds to our estimate.

As part of checkpointing, Spark writes out all the states to the data lake and it does this after every micro-batch completes. Spark spends a lot of time persisting 12 GiB of state and does so over and over.

However, during every micro-batch, we only modify 1 out of 10 blooms (the active bloom). The other 9 blooms might be queried but remain unchanged. The default HDFS state store provider is unaware of which bloom is changed, it simply persists the FluxState object. If the state store provider knew which bloom is the active bloom, it could be more efficient and only checkpoint the modified active bloom. It could potentially cut down serialization to 1/10th of the 12GiB.

Custom State Store Provider

The HDFSBackedStateStoreProvider.scala class handles the put and get state requests. It holds these key/value pairs in an in-memory map and persists these key/value to the datalake.

With some slight modifications to the put and get methods, we can specialize the behavior of the HDFSBackedStateStoreProvider class and make it aware of our forgetful bloom filter strategy.

The basic idea is to store each bloom segment under a seperate key. Instead of storing the entire state under the key “windows_host_abc”, we will store each segment under “windows_host_abc_segment1”, “windows_host_abc_segment2”, “windows_host_abc_segment3”, etc.

The put function receives the key and value to store. The key will be a host_id and the value will be a FluxState object. Spark encodes both the key and value as an UnsafeRow before calling this method:

override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
require(value != null, "Cannot put a null value")
verify(state == UPDATING, "Cannot put after already committed or aborted")
//val keyCopy = key.copy()
val valueCopy = value.copy()

// Create the key to store this bloom using the bloom id (active)
val newRowKey = makeRowKey(key.getString(0), getActiveValue(valueCopy))
// Store key/value as normal
mapToUpdate.put(newRowKey, valueCopy)
writeUpdateToDeltaFile(compressedStream, newRowKey, valueCopy)
}

Our put function is exactly the same as the original. The only change we make is to the key. We append the “active” bloom index to the original key.

It’s important to note that we also modified our FluxState class to serialize only the “active” bloom, rather than all 10 blooms.

def toState(): FluxState = {
version += 1
// we only store the active blooms, all other blooms were un-changed
val approxsize = tagCapacity / NUM_BLOOMS * 3
val byteArrayOut = new ByteArrayOutputStream(approxsize)
val store = new ObjectOutputStream(byteArrayOut)
store.writeObject(getActiveBloom())
store.close
serializedBlooms = List(byteArrayOut.toByteArray())
}

The original get method was like this:

override def get(key: UnsafeRow): UnsafeRow = {
mapToUpdate.get(key)
}

We modified the get method to gather the 10 blooms segments of a host_id. First, we build a list of FluxStates by iterating over the 10 bloom indexes. Then, we create a new FluxState which holds all the blooms. We determine which one is the active bloom using the version number.

override def get(key: UnsafeRow): UnsafeRow = {
// CCCS: The list of blooms for this key are put
// in separate entries in the store
// we will find all of these entries and create a FluxState
val groupKey = key.getString(0)
val fluxStates = Range(0, NUM_BLOOMS)
.map(bloomIndex => mapToUpdate.get(makeRowKey(groupKey, bloomIndex)))
.filter(_ != null)
.map(value => deserializeFluxState(value))

if(fluxStates.length > 0) {
makeRowValue(coalesceFluxStates(fluxStates))
}
else {
// else we found none of the blooms
null
}
}

Adjusting False Positive Probability

Now that we have segmented the bloom in 10 parts, we can potentially query 10 blooms and thus have more chances of false positives. To remedy this, we’ve reduce the fpp to 1/10000 which results in an overall fpp of 1/1000. This is ten times less chances then in our previous experiments. Nonetheless, because we only serialize the “active” bloom, the overall performance is much better.

Results

Previously, when we were serializing all segments, we could reach up to a capacity of 100,000 tags per host at a rate of 5,000 events per second.

Using a segmented approach where we only serialize the active bloom, we can achieve a capacity of 300,000 tags per host at a rate of 5,000 events per second. Alternatively, we can reduce the size of the bloom to accommodate more events per seconds: 200,000 tag @ 8,000 events per second

Image by Author

In all previous experiments, we have been “seeding” the newly created bloom filters with random tags. We did this to prevent the HDFSBackStore from compressing the bloom filters while it saves their state to the data lake. An empty bloom compresses to practically zero and a bloom at capacity (with maximum entropy) is pretty much incompressible. When we first launched the experiment, performance was amazing due to incredible compression. It took a very long time to see the effect of the tags in the blooms. To fix this, we seeded all blooms at 95% capacity. In other words, we have been measuring the worst case scenario.

However, in practice, blooms will fill up slowly. Some blooms will fill up faster than others. Statistically, we will never have all 50,000 blooms filled at 95% capacity at the exact same time. A more realistic simulation can be performed using random seeding.

def createBloom() = {
val bloomCapacity: Int = tagCapacity / NUM_BLOOMS
val bloom = BloomFilter.create(
Funnels.stringFunnel(),
bloomCapacity,
desiredFpp)
val prep = (bloomCapacity * Math.random()).toInt
(1 to prep).map("padding" + _).foreach(s => bloom.put(s))
bloom
}

Benefiting from compressible blooms, we can run this simulation with a rate of 10,000 events per seconds and a overall capacity of 400,000 tags per host for a total of 20 billion tags on a single Spark worker. This is way more than the 100 million tags we could achieve with stream-stream join.

Storing and retrieving tags from the blooms is extremely fast. On average, a single machine can perform about 200,000 tests a second. Storing tags in the bloom is a bit more costly, but a machine can still store 20,000 tags a second. This means we can support a lot of Sigma rules simultaneously.

Here’s a summary of the different strategies used in our experiments.

Image by Author

Conclusion

The results clearly show the performance improvements of the bloom strategy coupled with a custom state store which only saves the “active” bloom segments. The bloom strategy is also more generic than the stream-stream join approach since it can handle use cases such as “ancestors” and temporal proximity (ordered and un-ordered). The proof of concept can be found here https://github.com/cccs-jc/flux-capacitor. If you have novel use cases for this flux-capacitor function we would love to hear about it.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.
Leave a comment