Techno Blender
Digitally Yours.

Using Python UDF’s and Snowflake’s Snowpark to build and deploy Machine Learning Models, Part 2 | by Chris Kuchar | Aug, 2022

0 314


How to train and predict all within a UDF uploaded to Snowpark

Image from Anthi K via Unsplash

In Part 1, I showed how to train a local model, wrap it in a Python udf, push it to Snowflake using Snowpark, and use Snowpark or Snowflake Sql to make predictions using that udf.

This guide, on the other hand, will show you how to make a Python udf that builds, trains, and predicts on a model all using Snowpark and Snowflake compute. We will use Regression here, while Part 1 used Classification.

This article will also highlight some of the limitations I found when using Snowpark and Python udf’s.

I used this guide from snowflake’s guide on Github to get the main framework for building this.

Import Libraries

import snowflake.connector
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
from snowflake.connector.pandas_tools import write_pandas
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType
from snowflake.snowpark.session import Session
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window
from snowflake.snowpark.functions import udf, max, min, count, avg, sum, col, lit, listagg
import mlxtend
from mlxtend.feature_selection import ColumnSelector
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from sklearn import datasets

Data

We will be solving a Regression problem, using the Diabetes dataset from sklearn’s datasets (Bradley, et. al 2004). This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.

This allows for the sharing and adaptation of the datasets for any purpose, provided that the appropriate credit is given.

#Load the features
db = pd.DataFrame(datasets.load_diabetes().data, columns=datasets.load_diabetes().feature_names)
#Load the response variable
db['target'] = datasets.load_diabetes().target
#Convert to upper for preparation for uploading to Snowflake columnnames=[x.upper() for x in db.columns]
db.columns=columnnames
db.head()
Image by Author
print(datasets.load_diabetes().DESCR)
Image by Author
Image by Author

Let’s save the Diabetes data as a table in Snowflake. We can later connect to it using Snowpark:

#Set up the snowflake connection
ctx = snowflake.connector.connect(
user='<user>',
password='<password>',
account='<account-identifier>',
database='<database>',
warehouse='<warehouse>',
role='<role>',
schema='<schema>'
)
#Create the input string for the diabetes data
snowflake_cols=[str(x) + ' float' for x in db.columns]
s = ', '.join(snowflake_cols)
#Create the empty table in Snowflake
ctx.cursor().execute(
"""CREATE OR REPLACE TABLE
DIABETES_DATA(""" + s + """)""")
#Copy the table into snowflake
write_pandas(ctx, db, 'DIABETES_DATA')

Connect to the data within Snowpark:

#Create snowpark session to connect to saved table. 
def create_session_object():
connection_parameters = {
"account": "<account-identifier>",
"user": "<user>",
"password": "<password>",
"role": "<role>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "<schema>"
}
session = Session.builder.configs(connection_parameters).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
return session
#Create two sessions, one for doing initial testing, and one for pushing the udf to snowpark. I've found the session tends to fail if I use just one session for both. session=create_session_object()session2=create_session_object()cols=session.table('DIABETES_DATA')
cols.schema.fields
Image by Author

Modeling

In my opinion, a crucial thing to remember when building a udf, is that the data has to be passed in as an array_construct() or a single column for all features. This is similar to the features column input within Spark.

So, to prepare for our udf, let’s see if we can build a local model using a Snowpark array_construct as input. This way we will know if our code will work in our udf we will upload to Snowpark.

columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct of the data to be used for modeling. digits=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Collect the data from Snowpark
tbl=pd.DataFrame(digits.collect())
tbl.head()
Image by Author

You can see that reading it in to our local environment and trying to convert it to a DataFrame doesn’t work. It still keeps it as one column. Let’s separate the data out using the str.split method in Python.

tbl=tbl['INPUT_DATA'].str.split(',\n', expand=True)
tbl.head()
Image by Author

We still have some funky new line characters and brackets, so let’s get rid of those.

tbl[0]=[x.replace('\n','').replace('[  ','').replace(']','') for x in tbl[0]]tbl[10]=[x.replace('\n','').replace('[  ','').replace(']','') for x in tbl[10]]tbl.head()

Our data looks good now, let’s see if we can build out a model:

#Check NAs
count_nas=pd.DataFrame(tbl.isna().sum())
count_nas[count_nas[0]>0]
#Fill in NAs if there were any
tbl=tbl.fillna(0)
#Set the column names to what our original table in Snowflake has
tbl.columns=columns_list
#Split into features, target, training and validation
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])#Light gbm
clf = make_pipeline(lgb.LGBMRegressor())
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train,y_train)
#Predict on the validation data
preds=model.predict(X_valid)
#Look at the predictions
preds
Image by Author

These are what we would expect, predictions on a continuous range.

Now, let’s wrap this as a function so that we can upload it as a udf:

def lightgbm_train_predict_udf(db, columns_list):
import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from mlxtend.feature_selection import ColumnSelector
import mlxtend
#Read in the data
tbl=pd.DataFrame(db, columns=['INPUT_DATA'])
tbl=tbl['INPUT_DATA'].str.split(',\n', expand=True)
tbl[0]=[x.replace('\n','').replace('[ ','').replace(']','') for x in tbl[0]]
tbl[10]=[x.replace('\n','').replace('[ ','').replace(']','') for x in tbl[10]]
#Fill in NAs if there are any
tbl=tbl.fillna(0)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list
#Split into features, target, training and validation
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])
#Light gbm
clf = make_pipeline(lgb.LGBMRegressor())
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train,y_train)
#Predict on the validation data
preds=model.predict(X_valid)
return(preds)

Let’s now upload this as a udf:

dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict_udf, name="lightgbm_train_predict_udf",is_permanent=True,                      stage_location='MODELSTAGE', packages=['numpy','scikit-learn','lightgbm','pandas'], imports=dep_imports, input_types [T.ArrayType(), T.ArrayType()],return_type=T.ArrayType(), replace=True)

I want to call out the import argument of the session.udf.register function. If there are any libraries you need that aren’t within Snowpark, you can easily add them. This adds convenient flexibility to using Snowpark. In this case, I am importing mlxtend, which allows me to use the ColumnSelector function within the pipeline.

Another key point to note here is that we can only use types as listed within the Snowpark Types Documentation. Specifically, we have to use an array construct as input for the udf.

Now, let’s test our uploaded udf:

columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function 
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

This code is failing at ‘tbl=tbl[‘INPUT_DATA’].str.split(‘,\n’, expand=True)’. Which doesn’t make sense, because this works in our local enviroment as first tested.

def lightgbm_train_predict_udf(db, columns_list):
tbl=pd.DataFrame(db, columns=['INPUT_DATA'])
tbl=tbl['INPUT_DATA'].str.split(',\n', expand=True)

Let’s adjust this and test the udf again to see what it returns when we first run ‘tbl=pd.DataFrame(db, columns=[‘INPUT_DATA’])’. Since we can only return data types according to Snowpark’s documentation, the easiest one I have found is a np.array(). This changes from/to the Snowpark ArrayType() seamlessly. Returning a DataFrame does not work and is not supported.

def lightgbm_train_predict(diabetes, columns_list):
import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from mlxtend.feature_selection import ColumnSelector
import mlxtend
tbl=pd.DataFrame(diabetes, columns=['INPUT_DATA'])
return(np.array(tbl))

dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']

#Register the udf to Snowflake/Snowpark
lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict, name="lightgbm_train_predict_udf", is_permanent=True, stage_location='MODELSTAGE',
packages=['numpy','scikit-learn','lightgbm','pandas'],
imports=dep_imports, input_types=[T.ArrayType(), T.ArrayType()],
return_type=T.ArrayType(), replace=True)
columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()
output

This returns what we would expect. But this means our split method won’t work in the udf. This has led me to believe that there is some data type change in Snowpark that I can’t figure out. My assumption is that is has something to do with constraints on having Python running on Snowflake.

Setting up for another example ahead, let’s subset this array using:

output[0:1]
Image by Author

output[0:1] returns the first row of our array.

Moving on, below is the best workaround I have come up with to get the udf to work while reading in the data and formatting it for modeling.

def lightgbm_train_predict(db, columns_list):
db=np.array(db)
objects=[pd.DataFrame(db[i:i+1]) for i in range(0,11)]
tbl=pd.concat(objects, axis=1)
#Fill in NAs if there are any
tbl=tbl.fillna(0)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list
return(np.array(tbl))
dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']#Register the udf to Snowpark/Snowflake
lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict, name="lightgbm_train_predict_udf", is_permanent=True, stage_location='MODELSTAGE', packages=['numpy','scikit-learn','lightgbm','pandas'], imports=dep_imports, input_types=[T.ArrayType(), T.ArrayType()], return_type=T.ArrayType(), replace=True)
columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

Because this returned an array of all our data, we can know that the converting it to a DataFrame and renaming all the columns worked. Otherwise, it would have errored out.

The biggest callout from the code above I would point to are these three lines of code:

def lightgbm_train_predict(diabetes, columns_list):
diabetes=np.array(diabetes)
objects=[pd.DataFrame(diabetes[i:i+1]) for i in range(0,11)]

Once we convert it to a np.array, subsetting it using digits[0:1] returns the first column, while when we bring it to our local environment as then subset output[0:1] it returns the first row. I’m really not sure why Snowpark’s Python instance does this, but I wish it wouldn’t. Figuring out this difference was like putting on a blindfold on and trying to solve a Rubik’s cube based on how the color’s feel. I don’t recommend it nor do I ever want to do it again.

Let’s try to re-register the udf and build the model:

def lightgbm_train_predict(db, columns_list):
import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from mlxtend.feature_selection import ColumnSelector
import mlxtend
#Read in the data
db=np.array(db)
objects=[pd.DataFrame(db[i:i+1]) for i in range(0,11)]
tbl=pd.concat(objects, axis=1)
#Fill in NAs if there are any
tbl=tbl.fillna(0)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list
#Split into features, target, training and validation
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])
#Light gbm
clf = make_pipeline(lgb.LGBMRegressor())
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train,y_train)
#Predict on the validation data
preds=model.predict(X_valid)
return(preds)
dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']#Register the udf to Snowpark/Snowflake
lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict, name="lightgbm_train_predict_udf", is_permanent=True, stage_location='MODELSTAGE', packages=['numpy','scikit-learn','lightgbm','pandas'], imports=dep_imports, input_types=[T.ArrayType(), T.ArrayType()], return_type=T.ArrayType(), replace=True)
columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

Yay, another error. This error is the sklearn train_test_split function. For whatever reason, sklearn’s train_test_split does not work within Snowpark. Also, there was an error when using light gbm. But I will skip putting that example here.

My workaround to fix these errors is splitting into training and test manually using np.random.rand and using xgboost. This is my final solution and I am not sure even this works correctly. Although, it does complete the process and predict without showing errors.

def xgboost_train_predict(db, columns_list):
import pandas as pd
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import MultiLabelBinarizer, OneHotEncoder, FunctionTransformer
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from mlxtend.feature_selection import ColumnSelector
import mlxtend
#Read in the data
db=np.array(db)
objects=[pd.DataFrame(db[i:i+1]) for i in range(0,11)]
tbl=pd.concat(objects, axis=1)
#Fill in NAs if there are any
tbl=tbl.fillna(1)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list

# #Split into training and validation
tbl['prob'] = np.random.rand(len(tbl))
tbl['counter'] = [1 if x <= .65 else 0 for x in tbl['prob']]
# return(np.array(tbl))
training_data = tbl.loc[tbl['counter']==1,].reset_index(drop=True)
testing_data = tbl.loc[tbl['counter']==0,].reset_index(drop=True)
X_train=training_data.drop(columns=['TARGET','prob','counter'])
y_train=training_data['TARGET']
X_valid=testing_data.drop(columns=['TARGET','prob','counter'])
y_valid=testing_data['TARGET']

#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])
#Light gbm
clf = make_pipeline(XGBRegressor(n_estimators=5))
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train, y_train)
#Predict on the validation data
preds=model.predict(X_valid)
return(preds)
dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']xgboost_train_predict_udf = session2.udf.register(xgboost_train_predict,name="xgboost_train_predict_udf",is_permanent=True,stage_location='MODELSTAGE',packages=['numpy','scikit-learn','xgboost','pandas'], imports=dep_imports, input_types=[T.ArrayType(),T.ArrayType()], return_type=T.ArrayType(),replace=True)columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('xgboost_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

Looking at these results, there are clear errors with how Snowpark is handling the model. First, not every resulted prediction should be .5. It should be on a continuous range as when we built the model locally. Second, it brings up empty arrays, because Snowpark won’t reindex dataframes or arrays even with DataFrame.reset_index(drop=True) during the training and test split. See code below:

training_data = tbl.loc[tbl['counter']==1,].reset_index(drop=True)
testing_data = tbl.loc[tbl['counter']==0,].reset_index(drop=True)

Conclusion:

I think it is better to build a model locally then send it up to Snowflake’s Snowpark as a udf and as we did in Part 1 of this series. My opinion is that Snowpark’s Python is still in the early stages and doesn’t have the functionality yet to use many functions that a local environment’s Python has.

Functions that bring errors within Snowpark’s Python that work in my local environment using the same exact process and data:

  1. Identical array subsetting/dimensional arrays. For example, output[0:1] from the example above returns the first row in the local environment, and the first column within Snowpark
  2. Converting from Snowpark array’s or dataframes to pandas dataframes
  3. Sklearn’s train_test_split()
  4. Lightgbm
  5. Reindexing dataframes or arrays
  6. Tfidf() and countvectorizer()

Functionalities I wish udf’s had in Snowpark:

  1. Have the option to return dataframe’s as a result of uploaded udf’s
  2. Return a model as output of an uploaded udf

Overall, I see the usability of Snowpark and I am excited for the changes and improvements to come. If Snowflake fixes these errors I will post an updated guide with a working example.

References:

Bradley Efron, Trevor Hastie, Iain Johnstone and Robert Tibshirani (2004) “Least Angle Regression,” Annals of Statistics (with discussion), 407–499. (https://web.stanford.edu/~hastie/Papers/LARS/LeastAngle_2002.pdf)


How to train and predict all within a UDF uploaded to Snowpark

Image from Anthi K via Unsplash

In Part 1, I showed how to train a local model, wrap it in a Python udf, push it to Snowflake using Snowpark, and use Snowpark or Snowflake Sql to make predictions using that udf.

This guide, on the other hand, will show you how to make a Python udf that builds, trains, and predicts on a model all using Snowpark and Snowflake compute. We will use Regression here, while Part 1 used Classification.

This article will also highlight some of the limitations I found when using Snowpark and Python udf’s.

I used this guide from snowflake’s guide on Github to get the main framework for building this.

Import Libraries

import snowflake.connector
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
from snowflake.connector.pandas_tools import write_pandas
from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType
from snowflake.snowpark.session import Session
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window
from snowflake.snowpark.functions import udf, max, min, count, avg, sum, col, lit, listagg
import mlxtend
from mlxtend.feature_selection import ColumnSelector
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from sklearn import datasets

Data

We will be solving a Regression problem, using the Diabetes dataset from sklearn’s datasets (Bradley, et. al 2004). This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.

This allows for the sharing and adaptation of the datasets for any purpose, provided that the appropriate credit is given.

#Load the features
db = pd.DataFrame(datasets.load_diabetes().data, columns=datasets.load_diabetes().feature_names)
#Load the response variable
db['target'] = datasets.load_diabetes().target
#Convert to upper for preparation for uploading to Snowflake columnnames=[x.upper() for x in db.columns]
db.columns=columnnames
db.head()
Image by Author
print(datasets.load_diabetes().DESCR)
Image by Author
Image by Author

Let’s save the Diabetes data as a table in Snowflake. We can later connect to it using Snowpark:

#Set up the snowflake connection
ctx = snowflake.connector.connect(
user='<user>',
password='<password>',
account='<account-identifier>',
database='<database>',
warehouse='<warehouse>',
role='<role>',
schema='<schema>'
)
#Create the input string for the diabetes data
snowflake_cols=[str(x) + ' float' for x in db.columns]
s = ', '.join(snowflake_cols)
#Create the empty table in Snowflake
ctx.cursor().execute(
"""CREATE OR REPLACE TABLE
DIABETES_DATA(""" + s + """)""")
#Copy the table into snowflake
write_pandas(ctx, db, 'DIABETES_DATA')

Connect to the data within Snowpark:

#Create snowpark session to connect to saved table. 
def create_session_object():
connection_parameters = {
"account": "<account-identifier>",
"user": "<user>",
"password": "<password>",
"role": "<role>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "<schema>"
}
session = Session.builder.configs(connection_parameters).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
return session
#Create two sessions, one for doing initial testing, and one for pushing the udf to snowpark. I've found the session tends to fail if I use just one session for both. session=create_session_object()session2=create_session_object()cols=session.table('DIABETES_DATA')
cols.schema.fields
Image by Author

Modeling

In my opinion, a crucial thing to remember when building a udf, is that the data has to be passed in as an array_construct() or a single column for all features. This is similar to the features column input within Spark.

So, to prepare for our udf, let’s see if we can build a local model using a Snowpark array_construct as input. This way we will know if our code will work in our udf we will upload to Snowpark.

columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct of the data to be used for modeling. digits=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Collect the data from Snowpark
tbl=pd.DataFrame(digits.collect())
tbl.head()
Image by Author

You can see that reading it in to our local environment and trying to convert it to a DataFrame doesn’t work. It still keeps it as one column. Let’s separate the data out using the str.split method in Python.

tbl=tbl['INPUT_DATA'].str.split(',\n', expand=True)
tbl.head()
Image by Author

We still have some funky new line characters and brackets, so let’s get rid of those.

tbl[0]=[x.replace('\n','').replace('[  ','').replace(']','') for x in tbl[0]]tbl[10]=[x.replace('\n','').replace('[  ','').replace(']','') for x in tbl[10]]tbl.head()

Our data looks good now, let’s see if we can build out a model:

#Check NAs
count_nas=pd.DataFrame(tbl.isna().sum())
count_nas[count_nas[0]>0]
#Fill in NAs if there were any
tbl=tbl.fillna(0)
#Set the column names to what our original table in Snowflake has
tbl.columns=columns_list
#Split into features, target, training and validation
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])#Light gbm
clf = make_pipeline(lgb.LGBMRegressor())
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train,y_train)
#Predict on the validation data
preds=model.predict(X_valid)
#Look at the predictions
preds
Image by Author

These are what we would expect, predictions on a continuous range.

Now, let’s wrap this as a function so that we can upload it as a udf:

def lightgbm_train_predict_udf(db, columns_list):
import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from mlxtend.feature_selection import ColumnSelector
import mlxtend
#Read in the data
tbl=pd.DataFrame(db, columns=['INPUT_DATA'])
tbl=tbl['INPUT_DATA'].str.split(',\n', expand=True)
tbl[0]=[x.replace('\n','').replace('[ ','').replace(']','') for x in tbl[0]]
tbl[10]=[x.replace('\n','').replace('[ ','').replace(']','') for x in tbl[10]]
#Fill in NAs if there are any
tbl=tbl.fillna(0)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list
#Split into features, target, training and validation
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])
#Light gbm
clf = make_pipeline(lgb.LGBMRegressor())
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train,y_train)
#Predict on the validation data
preds=model.predict(X_valid)
return(preds)

Let’s now upload this as a udf:

dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict_udf, name="lightgbm_train_predict_udf",is_permanent=True,                      stage_location='MODELSTAGE', packages=['numpy','scikit-learn','lightgbm','pandas'], imports=dep_imports, input_types [T.ArrayType(), T.ArrayType()],return_type=T.ArrayType(), replace=True)

I want to call out the import argument of the session.udf.register function. If there are any libraries you need that aren’t within Snowpark, you can easily add them. This adds convenient flexibility to using Snowpark. In this case, I am importing mlxtend, which allows me to use the ColumnSelector function within the pipeline.

Another key point to note here is that we can only use types as listed within the Snowpark Types Documentation. Specifically, we have to use an array construct as input for the udf.

Now, let’s test our uploaded udf:

columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function 
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

This code is failing at ‘tbl=tbl[‘INPUT_DATA’].str.split(‘,\n’, expand=True)’. Which doesn’t make sense, because this works in our local enviroment as first tested.

def lightgbm_train_predict_udf(db, columns_list):
tbl=pd.DataFrame(db, columns=['INPUT_DATA'])
tbl=tbl['INPUT_DATA'].str.split(',\n', expand=True)

Let’s adjust this and test the udf again to see what it returns when we first run ‘tbl=pd.DataFrame(db, columns=[‘INPUT_DATA’])’. Since we can only return data types according to Snowpark’s documentation, the easiest one I have found is a np.array(). This changes from/to the Snowpark ArrayType() seamlessly. Returning a DataFrame does not work and is not supported.

def lightgbm_train_predict(diabetes, columns_list):
import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from mlxtend.feature_selection import ColumnSelector
import mlxtend
tbl=pd.DataFrame(diabetes, columns=['INPUT_DATA'])
return(np.array(tbl))

dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']

#Register the udf to Snowflake/Snowpark
lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict, name="lightgbm_train_predict_udf", is_permanent=True, stage_location='MODELSTAGE',
packages=['numpy','scikit-learn','lightgbm','pandas'],
imports=dep_imports, input_types=[T.ArrayType(), T.ArrayType()],
return_type=T.ArrayType(), replace=True)
columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()
output

This returns what we would expect. But this means our split method won’t work in the udf. This has led me to believe that there is some data type change in Snowpark that I can’t figure out. My assumption is that is has something to do with constraints on having Python running on Snowflake.

Setting up for another example ahead, let’s subset this array using:

output[0:1]
Image by Author

output[0:1] returns the first row of our array.

Moving on, below is the best workaround I have come up with to get the udf to work while reading in the data and formatting it for modeling.

def lightgbm_train_predict(db, columns_list):
db=np.array(db)
objects=[pd.DataFrame(db[i:i+1]) for i in range(0,11)]
tbl=pd.concat(objects, axis=1)
#Fill in NAs if there are any
tbl=tbl.fillna(0)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list
return(np.array(tbl))
dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']#Register the udf to Snowpark/Snowflake
lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict, name="lightgbm_train_predict_udf", is_permanent=True, stage_location='MODELSTAGE', packages=['numpy','scikit-learn','lightgbm','pandas'], imports=dep_imports, input_types=[T.ArrayType(), T.ArrayType()], return_type=T.ArrayType(), replace=True)
columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

Because this returned an array of all our data, we can know that the converting it to a DataFrame and renaming all the columns worked. Otherwise, it would have errored out.

The biggest callout from the code above I would point to are these three lines of code:

def lightgbm_train_predict(diabetes, columns_list):
diabetes=np.array(diabetes)
objects=[pd.DataFrame(diabetes[i:i+1]) for i in range(0,11)]

Once we convert it to a np.array, subsetting it using digits[0:1] returns the first column, while when we bring it to our local environment as then subset output[0:1] it returns the first row. I’m really not sure why Snowpark’s Python instance does this, but I wish it wouldn’t. Figuring out this difference was like putting on a blindfold on and trying to solve a Rubik’s cube based on how the color’s feel. I don’t recommend it nor do I ever want to do it again.

Let’s try to re-register the udf and build the model:

def lightgbm_train_predict(db, columns_list):
import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from mlxtend.feature_selection import ColumnSelector
import mlxtend
#Read in the data
db=np.array(db)
objects=[pd.DataFrame(db[i:i+1]) for i in range(0,11)]
tbl=pd.concat(objects, axis=1)
#Fill in NAs if there are any
tbl=tbl.fillna(0)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list
#Split into features, target, training and validation
X=tbl.drop(columns='TARGET')
y=tbl['TARGET']
X_train, X_valid, y_train, y_valid = train_test_split(X, y, random_state=1234, test_size=.33)
#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])
#Light gbm
clf = make_pipeline(lgb.LGBMRegressor())
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train,y_train)
#Predict on the validation data
preds=model.predict(X_valid)
return(preds)
dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']#Register the udf to Snowpark/Snowflake
lightgbm_train_predict_udf = session2.udf.register(lightgbm_train_predict, name="lightgbm_train_predict_udf", is_permanent=True, stage_location='MODELSTAGE', packages=['numpy','scikit-learn','lightgbm','pandas'], imports=dep_imports, input_types=[T.ArrayType(), T.ArrayType()], return_type=T.ArrayType(), replace=True)
columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('lightgbm_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

Yay, another error. This error is the sklearn train_test_split function. For whatever reason, sklearn’s train_test_split does not work within Snowpark. Also, there was an error when using light gbm. But I will skip putting that example here.

My workaround to fix these errors is splitting into training and test manually using np.random.rand and using xgboost. This is my final solution and I am not sure even this works correctly. Although, it does complete the process and predict without showing errors.

def xgboost_train_predict(db, columns_list):
import pandas as pd
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import MultiLabelBinarizer, OneHotEncoder, FunctionTransformer
from sklearn.pipeline import make_pipeline, Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from mlxtend.feature_selection import ColumnSelector
import mlxtend
#Read in the data
db=np.array(db)
objects=[pd.DataFrame(db[i:i+1]) for i in range(0,11)]
tbl=pd.concat(objects, axis=1)
#Fill in NAs if there are any
tbl=tbl.fillna(1)
#Change the column names to what our table in Snowflake has
tbl.columns=columns_list

# #Split into training and validation
tbl['prob'] = np.random.rand(len(tbl))
tbl['counter'] = [1 if x <= .65 else 0 for x in tbl['prob']]
# return(np.array(tbl))
training_data = tbl.loc[tbl['counter']==1,].reset_index(drop=True)
testing_data = tbl.loc[tbl['counter']==0,].reset_index(drop=True)
X_train=training_data.drop(columns=['TARGET','prob','counter'])
y_train=training_data['TARGET']
X_valid=testing_data.drop(columns=['TARGET','prob','counter'])
y_valid=testing_data['TARGET']

#Build a pipeline
numeric_features=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6']
numeric_cols = Pipeline(steps=[
('selector', ColumnSelector(numeric_features))])
# Combine categorical and numerical pipeline with FeatureUnion
preprocessor = FeatureUnion([
('select_numeric_cols',numeric_cols)
])
pipe_feat_un = Pipeline(steps=[('preprocessor', preprocessor)])
#Light gbm
clf = make_pipeline(XGBRegressor(n_estimators=5))
#Add the model to the pipeline
model = make_pipeline(pipe_feat_un, clf)
#Fit on the training data
model.fit(X_train, y_train)
#Predict on the validation data
preds=model.predict(X_valid)
return(preds)
dep_imports=['/opt/anaconda3/lib/python3.8/site-packages/mlxtend']xgboost_train_predict_udf = session2.udf.register(xgboost_train_predict,name="xgboost_train_predict_udf",is_permanent=True,stage_location='MODELSTAGE',packages=['numpy','scikit-learn','xgboost','pandas'], imports=dep_imports, input_types=[T.ArrayType(),T.ArrayType()], return_type=T.ArrayType(),replace=True)columns_list=['AGE', 'SEX', 'BMI', 'BP', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6','TARGET']#Connect to the table in Snowflake and create an array construct for input into the udf diabetes=session2.table('DIABETES_DATA').select(F.array_construct(*columns_list).alias('INPUT_DATA'))#Create an array construct of the column names to be fed in the function
input_column_names = F.array_construct(*[F.lit(x) for x in columns_list])
#Call the udf and look at the results output=diabetes.select(F.call_udf('xgboost_train_predict_udf', F.col('INPUT_DATA'), input_column_names).alias('ALL_ONE_UDF_PREDICTED')).collect()output
Image by Author

Looking at these results, there are clear errors with how Snowpark is handling the model. First, not every resulted prediction should be .5. It should be on a continuous range as when we built the model locally. Second, it brings up empty arrays, because Snowpark won’t reindex dataframes or arrays even with DataFrame.reset_index(drop=True) during the training and test split. See code below:

training_data = tbl.loc[tbl['counter']==1,].reset_index(drop=True)
testing_data = tbl.loc[tbl['counter']==0,].reset_index(drop=True)

Conclusion:

I think it is better to build a model locally then send it up to Snowflake’s Snowpark as a udf and as we did in Part 1 of this series. My opinion is that Snowpark’s Python is still in the early stages and doesn’t have the functionality yet to use many functions that a local environment’s Python has.

Functions that bring errors within Snowpark’s Python that work in my local environment using the same exact process and data:

  1. Identical array subsetting/dimensional arrays. For example, output[0:1] from the example above returns the first row in the local environment, and the first column within Snowpark
  2. Converting from Snowpark array’s or dataframes to pandas dataframes
  3. Sklearn’s train_test_split()
  4. Lightgbm
  5. Reindexing dataframes or arrays
  6. Tfidf() and countvectorizer()

Functionalities I wish udf’s had in Snowpark:

  1. Have the option to return dataframe’s as a result of uploaded udf’s
  2. Return a model as output of an uploaded udf

Overall, I see the usability of Snowpark and I am excited for the changes and improvements to come. If Snowflake fixes these errors I will post an updated guide with a working example.

References:

Bradley Efron, Trevor Hastie, Iain Johnstone and Robert Tibshirani (2004) “Least Angle Regression,” Annals of Statistics (with discussion), 407–499. (https://web.stanford.edu/~hastie/Papers/LARS/LeastAngle_2002.pdf)

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment