Techno Blender
Digitally Yours.

Distributed Forecast of 1M Time Series in Under 15 Minutes with Spark, Nixtla, and Fugue | by Federico Garza Ramírez | Sep, 2022

0 63


Scalable Time Series Modeling with open-source projects StatsForecast, Fugue, and Spark

By Kevin Kho, Han Wang, Max Mergenthaler and Federico Garza Ramírez.

TL:DR We will show how you can leverage the distributed power of Spark and the highly efficient code from StatsForecast to fit millions of models in a couple of minutes.

Time-series modeling, analysis, and prediction of trends and seasonalities for data collected over time is a rapidly growing category of software applications.

Businesses, from electricity and economics to healthcare analytics, collect time-series data daily to predict patterns and build better data-driven product experiences. For example, temperature and humidity prediction is used in manufacturing to prevent defects, streaming metrics predictions help identify music’s popular artists, and sales forecasting for thousands of SKUs across different locations in the supply chain is used to optimize inventory costs. As data generation increases, the forecasting necessities have evolved from modeling a few time series to predicting millions.

Nixtla is an open-source project focused on state-of-the-art time series forecasting. They have a couple of libraries such as StatsForecast for statistical models, NeuralForecast for deep learning, and HierarchicalForecast for forecast aggregations across different levels of hierarchies. These are production-ready time series libraries focused on different modeling techniques.

This article looks at StatsForecast, a lightning-fast forecasting library with statistical and econometrics models. The AutoARIMA model of Nixtla is 20x faster than pmdarima, and the ETS (error, trend, seasonal) models performed 4x faster than statsmodels and are more robust. The benchmarks and code to reproduce can be found here. A huge part of the performance increase is due to using a JIT compiler called numba to achieve high speeds.

The faster iteration time means that data scientists can run more experiments and converge to more accurate models faster. It also means that running benchmarks at scale becomes easier.

In this article, we are interested in the scalability of the StatsForecast library in fitting models over Spark or Dask using the Fugue library. This combination will allow us to train a huge number of models distributedly over a temporary cluster quickly.

When dealing with large time series data, users normally have to deal with thousands of logically independent time series (think of telemetry of different users or different product sales). In this case, we can train one big model over all of the series, or we can create one model for each series. Both are valid approaches since the bigger model will pick up trends across the population, while training thousands of models may fit individual series data better.

Note: to pick up both the micro and macro trends of the time series population in one model, check the Nixtla HierarchicalForecast library, but this is also more computationally expensive and trickier to scale.

This article will deal with the scenario where we train a couple of models (AutoARIMA or ETS) per univariate time series. For this setup, we group the full data by time series, and then train each model for each group. The image below illustrates this. The distributed DataFrame can either be a Spark or Dask DataFrame.

AutoARIMA per partition — Image by Author

Nixtla previously released benchmarks with Anyscale on distributing this model training on Ray. The setup and results can be found in this blog. The results are also shown below. It took 2000 cpus to run one million AutoARIMA models in 35 minutes. We’ll compare this against running on Spark.

StatsForecast on Ray results — Image by author

First, we’ll look at the StatsForecast code used to run the AutoARIMA distributedly on Ray. This is a simplified version to run the scenario with a one million time series. It is also updated for the recent StatsForecast v1.0.0 release, so it may look a bit different from the code in the previous benchmarks.

Running StatsForecast distributedly on Ray

The interface of StatsForecast is very minimal. It is already designed to perform the AutoARIMA on each group of data. Just supplying the ray_address will make this code snippet run distributedly. Without it, n_jobswill indicate the number of parallel processes for forecasting. model.forecast() will do the fit and predict in one step, and the input to this method in the time horizon to forecast.

Fugue is an abstraction layer that ports Python, Pandas, and SQL code to Spark and Dask. The most minimal interface is the transform() function. This function takes in a function and DataFrame, and brings it to Spark or Dask. We can use the transform() function to bring StatsForecast execution to Spark.

There are two parts to the code below. First, we have the forecast logic defined in the forecast_series function. Some parameters are hardcoded for simplicity. The most important one is that n_jobs=1 . This is because Spark or Dask will already serve as the parallelization layer, and having two stages of parallelism can cause resource deadlocks.

Running Statsforecast on Spark with Fugue

Second, the transform() function is used to apply the forecast_series() function on Spark. The first two arguments are the DataFrame and function to be applied. Output schema is a requirement for Spark, so we need to pass it in, and the partition argument will take care of splitting the time series modelling by unique_id.

This code already works and returns a Spark DataFrame output.

The transform()above is a general look at what Fugue can do. In practice, the Fugue and Nixtla teams collaborated to add a more native FugueBackendto the StatsForecast library. Along with it is a utility forecast() function to simplify the forecasting interface. Below is an end-to-end example of running StatsForecast on one million time series.

We just need to create the FugueBackend, which takes in a SparkSession and passes it to forecast() . This function can take either a DataFrame or file path to the data. If a file path is provided, it will be loaded with the parallel backend. In this example above, we replaced the file each time we ran the experiment to generate benchmarks.

It’s also important to note that we can test locally before running the forecast()on full data. All we have to do is not supply anything for the parallel argument; everything will run on Pandas sequentially.

The benchmark results can be seen below. As of the time of this writing, Dask and Ray made recent releases, so only the Spark metrics are up to date. We will make a follow-up article after running these experiments with the updates.

Spark and Dask benchmarks for StatsForecast at scale

Note: The attempt was to use 2000 cpus but we were limited by available compute instances on AWS.

The important part here is that AutoARIMA trained one million time series models in less than 15 minutes. The cluster configuration is attached in the appendix. With very few lines of code, we were able to orchestrate the training of these time series models distributedly.

Training thousands of time series models distributedly normally takes a lot of coding with Spark and Dask, but we were able to run these experiments with very few lines of code. Nixtla’s StatsForecast offers the ability to quickly utilize all of the compute resources available to find the best model for each time series. All users need to do is supply a relevant parallel backend (Ray or Fugue) to run on a cluster.

On the scale of one million timeseries, our total training time took 12 minutes for AutoARIMA. This is the equivalent of close to 400 cpu-hours that we ran immediately, allowing data scientists to quickly iterate at scale without having to write the explicit code for parallelization. Because we used an ephemeral cluster, the cost is effectively the same as running this sequentially on an EC2 instance (parallelized over all cores).

  1. Nixtla StatsForecast repo
  2. StatsForecast docs
  3. Fugue repo
  4. Fugue tutorials

To chat with us:

  1. Fugue Slack
  2. Nixtla Slack

For anyone. interested in the cluster configuration, it can be seen below. This will spin up a Databricks cluster. The important thing is the node_type_id that has the machines used.

{
"num_workers": 20,
"cluster_name": "fugue-nixtla-2",
"spark_version": "10.4.x-scala2.12",
"spark_conf": {
"spark.speculation": "true",
"spark.sql.shuffle.partitions": "8000",
"spark.sql.adaptive.enabled": "false",
"spark.task.cpus": "1"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-west-2c",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 32
},
"node_type_id": "m5.24xlarge",
"driver_node_type_id": "m5.2xlarge",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {
"MKL_NUM_THREADS": "1",
"OPENBLAS_NUM_THREADS": "1",
"VECLIB_MAXIMUM_THREADS": "1",
"OMP_NUM_THREADS": "1",
"NUMEXPR_NUM_THREADS": "1"
},
"autotermination_minutes": 20,
"enable_elastic_disk": false,
"cluster_source": "UI",
"init_scripts": [],
"runtime_engine": "STANDARD",
"cluster_id": "0728-004950-oefym0ss"
}


Scalable Time Series Modeling with open-source projects StatsForecast, Fugue, and Spark

By Kevin Kho, Han Wang, Max Mergenthaler and Federico Garza Ramírez.

TL:DR We will show how you can leverage the distributed power of Spark and the highly efficient code from StatsForecast to fit millions of models in a couple of minutes.

Time-series modeling, analysis, and prediction of trends and seasonalities for data collected over time is a rapidly growing category of software applications.

Businesses, from electricity and economics to healthcare analytics, collect time-series data daily to predict patterns and build better data-driven product experiences. For example, temperature and humidity prediction is used in manufacturing to prevent defects, streaming metrics predictions help identify music’s popular artists, and sales forecasting for thousands of SKUs across different locations in the supply chain is used to optimize inventory costs. As data generation increases, the forecasting necessities have evolved from modeling a few time series to predicting millions.

Nixtla is an open-source project focused on state-of-the-art time series forecasting. They have a couple of libraries such as StatsForecast for statistical models, NeuralForecast for deep learning, and HierarchicalForecast for forecast aggregations across different levels of hierarchies. These are production-ready time series libraries focused on different modeling techniques.

This article looks at StatsForecast, a lightning-fast forecasting library with statistical and econometrics models. The AutoARIMA model of Nixtla is 20x faster than pmdarima, and the ETS (error, trend, seasonal) models performed 4x faster than statsmodels and are more robust. The benchmarks and code to reproduce can be found here. A huge part of the performance increase is due to using a JIT compiler called numba to achieve high speeds.

The faster iteration time means that data scientists can run more experiments and converge to more accurate models faster. It also means that running benchmarks at scale becomes easier.

In this article, we are interested in the scalability of the StatsForecast library in fitting models over Spark or Dask using the Fugue library. This combination will allow us to train a huge number of models distributedly over a temporary cluster quickly.

When dealing with large time series data, users normally have to deal with thousands of logically independent time series (think of telemetry of different users or different product sales). In this case, we can train one big model over all of the series, or we can create one model for each series. Both are valid approaches since the bigger model will pick up trends across the population, while training thousands of models may fit individual series data better.

Note: to pick up both the micro and macro trends of the time series population in one model, check the Nixtla HierarchicalForecast library, but this is also more computationally expensive and trickier to scale.

This article will deal with the scenario where we train a couple of models (AutoARIMA or ETS) per univariate time series. For this setup, we group the full data by time series, and then train each model for each group. The image below illustrates this. The distributed DataFrame can either be a Spark or Dask DataFrame.

AutoARIMA per partition — Image by Author

Nixtla previously released benchmarks with Anyscale on distributing this model training on Ray. The setup and results can be found in this blog. The results are also shown below. It took 2000 cpus to run one million AutoARIMA models in 35 minutes. We’ll compare this against running on Spark.

StatsForecast on Ray results — Image by author

First, we’ll look at the StatsForecast code used to run the AutoARIMA distributedly on Ray. This is a simplified version to run the scenario with a one million time series. It is also updated for the recent StatsForecast v1.0.0 release, so it may look a bit different from the code in the previous benchmarks.

Running StatsForecast distributedly on Ray

The interface of StatsForecast is very minimal. It is already designed to perform the AutoARIMA on each group of data. Just supplying the ray_address will make this code snippet run distributedly. Without it, n_jobswill indicate the number of parallel processes for forecasting. model.forecast() will do the fit and predict in one step, and the input to this method in the time horizon to forecast.

Fugue is an abstraction layer that ports Python, Pandas, and SQL code to Spark and Dask. The most minimal interface is the transform() function. This function takes in a function and DataFrame, and brings it to Spark or Dask. We can use the transform() function to bring StatsForecast execution to Spark.

There are two parts to the code below. First, we have the forecast logic defined in the forecast_series function. Some parameters are hardcoded for simplicity. The most important one is that n_jobs=1 . This is because Spark or Dask will already serve as the parallelization layer, and having two stages of parallelism can cause resource deadlocks.

Running Statsforecast on Spark with Fugue

Second, the transform() function is used to apply the forecast_series() function on Spark. The first two arguments are the DataFrame and function to be applied. Output schema is a requirement for Spark, so we need to pass it in, and the partition argument will take care of splitting the time series modelling by unique_id.

This code already works and returns a Spark DataFrame output.

The transform()above is a general look at what Fugue can do. In practice, the Fugue and Nixtla teams collaborated to add a more native FugueBackendto the StatsForecast library. Along with it is a utility forecast() function to simplify the forecasting interface. Below is an end-to-end example of running StatsForecast on one million time series.

We just need to create the FugueBackend, which takes in a SparkSession and passes it to forecast() . This function can take either a DataFrame or file path to the data. If a file path is provided, it will be loaded with the parallel backend. In this example above, we replaced the file each time we ran the experiment to generate benchmarks.

It’s also important to note that we can test locally before running the forecast()on full data. All we have to do is not supply anything for the parallel argument; everything will run on Pandas sequentially.

The benchmark results can be seen below. As of the time of this writing, Dask and Ray made recent releases, so only the Spark metrics are up to date. We will make a follow-up article after running these experiments with the updates.

Spark and Dask benchmarks for StatsForecast at scale

Note: The attempt was to use 2000 cpus but we were limited by available compute instances on AWS.

The important part here is that AutoARIMA trained one million time series models in less than 15 minutes. The cluster configuration is attached in the appendix. With very few lines of code, we were able to orchestrate the training of these time series models distributedly.

Training thousands of time series models distributedly normally takes a lot of coding with Spark and Dask, but we were able to run these experiments with very few lines of code. Nixtla’s StatsForecast offers the ability to quickly utilize all of the compute resources available to find the best model for each time series. All users need to do is supply a relevant parallel backend (Ray or Fugue) to run on a cluster.

On the scale of one million timeseries, our total training time took 12 minutes for AutoARIMA. This is the equivalent of close to 400 cpu-hours that we ran immediately, allowing data scientists to quickly iterate at scale without having to write the explicit code for parallelization. Because we used an ephemeral cluster, the cost is effectively the same as running this sequentially on an EC2 instance (parallelized over all cores).

  1. Nixtla StatsForecast repo
  2. StatsForecast docs
  3. Fugue repo
  4. Fugue tutorials

To chat with us:

  1. Fugue Slack
  2. Nixtla Slack

For anyone. interested in the cluster configuration, it can be seen below. This will spin up a Databricks cluster. The important thing is the node_type_id that has the machines used.

{
"num_workers": 20,
"cluster_name": "fugue-nixtla-2",
"spark_version": "10.4.x-scala2.12",
"spark_conf": {
"spark.speculation": "true",
"spark.sql.shuffle.partitions": "8000",
"spark.sql.adaptive.enabled": "false",
"spark.task.cpus": "1"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-west-2c",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 32
},
"node_type_id": "m5.24xlarge",
"driver_node_type_id": "m5.2xlarge",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {
"MKL_NUM_THREADS": "1",
"OPENBLAS_NUM_THREADS": "1",
"VECLIB_MAXIMUM_THREADS": "1",
"OMP_NUM_THREADS": "1",
"NUMEXPR_NUM_THREADS": "1"
},
"autotermination_minutes": 20,
"enable_elastic_disk": false,
"cluster_source": "UI",
"init_scripts": [],
"runtime_engine": "STANDARD",
"cluster_id": "0728-004950-oefym0ss"
}

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment