Using Python UDF’s and Snowflake’s Snowpark to build and deploy Machine Learning Models, Part 1 | by Chris Kuchar | Jul, 2022


Photo from Alessio Soggetti via Unsplash

This guide will show you how to use Snowflake’s Snowpark with Python UDF’s, to leverage Snowflake’s compute power to run Machine Learning models using Python.

Import Libraries

import snowflake.connector
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
from snowflake.connector.pandas_tools import write_pandas
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType
from snowflake.snowpark.session import Session
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window
from snowflake.snowpark.functions import udf, max, min, count, avg, sum, col, lit, listagg
import mlxtend
from mlxtend.feature_selection import ColumnSelector
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import MultiLabelBinarizer, OneHotEncoder, FunctionTransformer
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics import balanced_accuracy_score
from sklearn import datasets

Data

We will use the Digits dataset from sklearn’s datasets (Alpaydin, E. & Kaynak, C). This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.

This allows for the sharing and adaptation of the datasets for any purpose, provided that the appropriate credit is given.

digits = pd.DataFrame(datasets.load_digits().data)
digits['target'] = datasets.load_digits().target
digits.head()
Image by Author
print(datasets.load_digits().DESCR)
Image by Author

Let’s split the Digits data into training and test and save them as separate tables in Snowflake. We can later connect to these using Snowpark:

#Set up the snowflake connection
ctx = snowflake.connector.connect(
user='<user>',
password='<password>',
account='<account-identifier>',
database='<database>',
warehouse='<warehouse>',
role='<role>',
schema='<schema>'
)
#Create column names for the tables
cols1=['X' + str(x) for x in range(0,64)]
cols1.append('TARGET')
digits.columns=cols1
X=digits.drop(columns='TARGET')
y=digits['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Create the DataFrame with the features and target for the validation data set
train=pd.concat([X_train,y_train], axis=1)
valid=pd.concat([X_valid,y_valid], axis=1)
#Match the snowflake table column names to the Digits data column names
snowflake_cols=['X' + str(x) + ' integer' for x in range(0,64)]
s = ', '.join(snowflake_cols)
#Create the training table in snowflake from the csv
ctx.cursor().execute(
"""CREATE OR REPLACE TABLE
DIGITS_TRAINING_DATA(""" + s + """, target integer)""")
#Copy the table into snowflake
write_pandas(ctx, train, 'DIGITS_TRAINING_DATA')
#Create the validation table in snowflake from the csv
ctx.cursor().execute(
"""CREATE OR REPLACE TABLE
DIGITS_VALIDATION_DATA(""" + s + """, target integer)""")
#Copy the table into snowflake
write_pandas(ctx, valid, 'DIGITS_VALIDATION_DATA')

Modeling

Connect to the data within Snowpark:

#Create snowpark session to connect to saved tables. 
def create_session_object():
connection_parameters = {
"account": "<account-identifier>",
"user": "<user>",
"password": "<password>",
"role": "<role>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "<schema>"
}
session = Session.builder.configs(connection_parameters).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
return session
#Create two sessions, one for pulling the initial data, and one for pushing the udf to snowpark. I've found it tends to fail if I use just one session
session=create_session_object()
session2=create_session_object()
cols=session.table('DIGITS_TRAINING_DATA')
cols.schema.fields
Image by Author

Pull the data from snowflake to our local environment:

tbl=pd.DataFrame(cols.collect())

Check for missing values:

count_nas=pd.DataFrame(tbl.isna().sum())
count_nas[count_nas[0]>0]
Image by Author

If there were missing values, we could fill them in with:

tbl=tbl.fillna(0)

Split into features and response variable:

#This is already our training set so no need to use train_test_split here. 
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']

Build out a pipeline preprocessor for all the features:

numeric_features=['X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])

If you have a few variable transformations to do before modeling, you can implement them using a pipeline and they will be packaged up along with the model in the udf.

Add a light gbm model to the pipeline:

clf = make_pipeline(lgb.LGBMClassifier())
model = make_pipeline(pipe_feat_un, clf)

Fit the model on the training data:

model.fit(X,y)
Image by Author

Save the model as a udf in snowflake

session2.clear_packages() #Clear out all existing packagessession2.clear_imports() #Clear out all existing importssession2.add_import('/opt/anaconda3/lib/python3.8/site-packages/mlxtend') #Add mlxtend as an import, since it is not available within Snowparksession2.add_packages('scikit-learn','lightgbm','pandas') #Add these packages to the udf, which exist in Snowpark session2.sql('create stage if not exists MODELSTAGE').collect() #Create a model stage if it does not already exist. 
Image by Author

I want to call out the add_import function. If there are any libraries you need that aren’t within Snowpark, you can easily add them. This adds convenient flexibility to using Snowpark.

Create a Python udf:

@udf(name='lightgbm_snowpark_digits',is_permanent = True, stage_location = '@MODELSTAGE', replace=True, session=session2)def predict_digits(args: list) -> float:
row = pd.DataFrame([args], columns=['X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63'])
return model.predict(row)

Three options for predicting:

  1. Use Snowpark to predict using our saved udf and bring the data back to the local environment:
session2.table('DIGITS_VALIDATION_DATA').select(F.call_udf("lightgbm_snowpark_digits",\
F.array_construct('X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63')).alias('DIGITS_VALIDATION_DATA_PREDICTION')).collect()
Image by Author

2. Use Snowpark to predict using our saved udf, and save the output as a table in snowflake:

session2.table('DIGITS_VALIDATION_DATA').select(F.call_udf("lightgbm_snowpark_digits",\
F.array_construct('X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63')).alias('DIGITS_VALIDATION_DATA_PREDICTION')).write.mode('overwrite').saveAsTable('light_gbm_snowpark_digits_validation')
#The array construct is how Snowflake passes in the data to the udf as a single column array of all the data, similar to Spark's feature vector format.

Within the warehouse, database, and schema you connected to in Snowpark, you will now see your saved table within Snowflake.

3. Use sql code in Snowflake to call the udf, to then predict on data within a table in snowflake:

select lightgbm_snowpark_digits(array_construct(X0, X1, X2, X3, X4, X5, X6, X7, X8, X9, X10,
X11, X12, X13, X14, X15, X16, X17, X18, X19, X20,
X21, X22, X23, X24, X25, X26, X27, X28, X29, X30,
X31, X32, X33, X34, X35, X36, X37, X38, X39, X40,
X41, X42, X43, X44, X45, X46, X47, X48, X49, X50,
X51, X52, X53, X54, X55, X56, X57, X58, X59, X60,
X61, X62, X63)) from ”DIGITS_VALIDATION_DATA”;

I hope you liked this quick guide on how to train a local model, package it as a udf, upload that udf to Snowflake using Snowpark, then predict on that data using Snowpark or Snowflake.

References:

Alpaydin, E. & Kaynak, C.. (1998). Optical Recognition of Handwritten Digits. UCI Machine Learning Repository.




Photo from Alessio Soggetti via Unsplash

This guide will show you how to use Snowflake’s Snowpark with Python UDF’s, to leverage Snowflake’s compute power to run Machine Learning models using Python.

Import Libraries

import snowflake.connector
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
from snowflake.connector.pandas_tools import write_pandas
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType
from snowflake.snowpark.session import Session
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window
from snowflake.snowpark.functions import udf, max, min, count, avg, sum, col, lit, listagg
import mlxtend
from mlxtend.feature_selection import ColumnSelector
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import MultiLabelBinarizer, OneHotEncoder, FunctionTransformer
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics import balanced_accuracy_score
from sklearn import datasets

Data

We will use the Digits dataset from sklearn’s datasets (Alpaydin, E. & Kaynak, C). This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.

This allows for the sharing and adaptation of the datasets for any purpose, provided that the appropriate credit is given.

digits = pd.DataFrame(datasets.load_digits().data)
digits['target'] = datasets.load_digits().target
digits.head()
Image by Author
print(datasets.load_digits().DESCR)
Image by Author

Let’s split the Digits data into training and test and save them as separate tables in Snowflake. We can later connect to these using Snowpark:

#Set up the snowflake connection
ctx = snowflake.connector.connect(
user='<user>',
password='<password>',
account='<account-identifier>',
database='<database>',
warehouse='<warehouse>',
role='<role>',
schema='<schema>'
)
#Create column names for the tables
cols1=['X' + str(x) for x in range(0,64)]
cols1.append('TARGET')
digits.columns=cols1
X=digits.drop(columns='TARGET')
y=digits['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Create the DataFrame with the features and target for the validation data set
train=pd.concat([X_train,y_train], axis=1)
valid=pd.concat([X_valid,y_valid], axis=1)
#Match the snowflake table column names to the Digits data column names
snowflake_cols=['X' + str(x) + ' integer' for x in range(0,64)]
s = ', '.join(snowflake_cols)
#Create the training table in snowflake from the csv
ctx.cursor().execute(
"""CREATE OR REPLACE TABLE
DIGITS_TRAINING_DATA(""" + s + """, target integer)""")
#Copy the table into snowflake
write_pandas(ctx, train, 'DIGITS_TRAINING_DATA')
#Create the validation table in snowflake from the csv
ctx.cursor().execute(
"""CREATE OR REPLACE TABLE
DIGITS_VALIDATION_DATA(""" + s + """, target integer)""")
#Copy the table into snowflake
write_pandas(ctx, valid, 'DIGITS_VALIDATION_DATA')

Modeling

Connect to the data within Snowpark:

#Create snowpark session to connect to saved tables. 
def create_session_object():
connection_parameters = {
"account": "<account-identifier>",
"user": "<user>",
"password": "<password>",
"role": "<role>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "<schema>"
}
session = Session.builder.configs(connection_parameters).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
return session
#Create two sessions, one for pulling the initial data, and one for pushing the udf to snowpark. I've found it tends to fail if I use just one session
session=create_session_object()
session2=create_session_object()
cols=session.table('DIGITS_TRAINING_DATA')
cols.schema.fields
Image by Author

Pull the data from snowflake to our local environment:

tbl=pd.DataFrame(cols.collect())

Check for missing values:

count_nas=pd.DataFrame(tbl.isna().sum())
count_nas[count_nas[0]>0]
Image by Author

If there were missing values, we could fill them in with:

tbl=tbl.fillna(0)

Split into features and response variable:

#This is already our training set so no need to use train_test_split here. 
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']

Build out a pipeline preprocessor for all the features:

numeric_features=['X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])

If you have a few variable transformations to do before modeling, you can implement them using a pipeline and they will be packaged up along with the model in the udf.

Add a light gbm model to the pipeline:

clf = make_pipeline(lgb.LGBMClassifier())
model = make_pipeline(pipe_feat_un, clf)

Fit the model on the training data:

model.fit(X,y)
Image by Author

Save the model as a udf in snowflake

session2.clear_packages() #Clear out all existing packagessession2.clear_imports() #Clear out all existing importssession2.add_import('/opt/anaconda3/lib/python3.8/site-packages/mlxtend') #Add mlxtend as an import, since it is not available within Snowparksession2.add_packages('scikit-learn','lightgbm','pandas') #Add these packages to the udf, which exist in Snowpark session2.sql('create stage if not exists MODELSTAGE').collect() #Create a model stage if it does not already exist. 
Image by Author

I want to call out the add_import function. If there are any libraries you need that aren’t within Snowpark, you can easily add them. This adds convenient flexibility to using Snowpark.

Create a Python udf:

@udf(name='lightgbm_snowpark_digits',is_permanent = True, stage_location = '@MODELSTAGE', replace=True, session=session2)def predict_digits(args: list) -> float:
row = pd.DataFrame([args], columns=['X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63'])
return model.predict(row)

Three options for predicting:

  1. Use Snowpark to predict using our saved udf and bring the data back to the local environment:
session2.table('DIGITS_VALIDATION_DATA').select(F.call_udf("lightgbm_snowpark_digits",\
F.array_construct('X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63')).alias('DIGITS_VALIDATION_DATA_PREDICTION')).collect()
Image by Author

2. Use Snowpark to predict using our saved udf, and save the output as a table in snowflake:

session2.table('DIGITS_VALIDATION_DATA').select(F.call_udf("lightgbm_snowpark_digits",\
F.array_construct('X0', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10',
'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18', 'X19', 'X20',
'X21', 'X22', 'X23', 'X24', 'X25', 'X26', 'X27', 'X28', 'X29', 'X30',
'X31', 'X32', 'X33', 'X34', 'X35', 'X36', 'X37', 'X38', 'X39', 'X40',
'X41', 'X42', 'X43', 'X44', 'X45', 'X46', 'X47', 'X48', 'X49', 'X50',
'X51', 'X52', 'X53', 'X54', 'X55', 'X56', 'X57', 'X58', 'X59', 'X60',
'X61', 'X62', 'X63')).alias('DIGITS_VALIDATION_DATA_PREDICTION')).write.mode('overwrite').saveAsTable('light_gbm_snowpark_digits_validation')
#The array construct is how Snowflake passes in the data to the udf as a single column array of all the data, similar to Spark's feature vector format.

Within the warehouse, database, and schema you connected to in Snowpark, you will now see your saved table within Snowflake.

3. Use sql code in Snowflake to call the udf, to then predict on data within a table in snowflake:

select lightgbm_snowpark_digits(array_construct(X0, X1, X2, X3, X4, X5, X6, X7, X8, X9, X10,
X11, X12, X13, X14, X15, X16, X17, X18, X19, X20,
X21, X22, X23, X24, X25, X26, X27, X28, X29, X30,
X31, X32, X33, X34, X35, X36, X37, X38, X39, X40,
X41, X42, X43, X44, X45, X46, X47, X48, X49, X50,
X51, X52, X53, X54, X55, X56, X57, X58, X59, X60,
X61, X62, X63)) from ”DIGITS_VALIDATION_DATA”;

I hope you liked this quick guide on how to train a local model, package it as a udf, upload that udf to Snowflake using Snowpark, then predict on that data using Snowpark or Snowflake.

References:

Alpaydin, E. & Kaynak, C.. (1998). Optical Recognition of Handwritten Digits. UCI Machine Learning Repository.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – admin@technoblender.com. The content will be deleted within 24 hours.
artificial intelligenceBuildChrisdeployJulKucharlatest newslearningMachineModelsPartpythonSnowflakesSnowparkTech NewsUDFs
Comments (0)
Add Comment