Techno Blender
Digitally Yours.

Deploying Multiple Models with SageMaker Pipelines | by Ram Vegiraju | Mar, 2023

0 107


Applying MLOps best practices to advanced serving options

Image from Unsplash by Growtika

MLOps is an essential practice to productionizing your Machine Learning workflows. With MLOps you can establish workflows that are catered for the ML lifecycle. These make it easier to centrally maintain resources, update/track models, and in general simplify the process as your ML experimentation scales up.

A key MLOps tool within the Amazon SageMaker ecosystem is SageMaker Pipelines. With SageMaker Pipelines you can define workflows that are composed of different defined ML steps. You can also structure these workflows by defining parameters that you can inject as variables into your Pipeline. For a more general introduction to SageMaker Pipelines, please refer to the linked article.

Defining a Pipeline in itself is not heavily complicated, but there’s a few advanced use-cases that need some extra configuring. Specifically, say that you are training multiple models that are needed for inference in your ML use-case. Within SageMaker there is a hosting option known as Multi-Model Endpoints (MME) where you can host several models on a singular endpoint and invoke a target model. However, within SageMaker Pipelines there’s no native support for defining or deploying a MME natively at the moment. In this blog post we’ll take a look at how we can utilize a Pipelines Lambda Step to deploy a Multi-Model Endpoint in a custom manner, while adhering to MLOPs best practices.

NOTE: For those of you new to AWS, make sure you make an account at the following link if you want to follow along. The article also assumes an intermediate understanding of SageMaker Deployment, I would suggest following this article for understanding Deployment/Inference more in depth. In particular, for SageMaker Multi-Model Endpoints I would refer to the following blog.

Setup

For this example, we will be working in SageMaker Studio, where we have access to the visual interfaces for SageMaker Pipelines and other SageMaker components. For development we will be utilizing a Studio Notebook Instance with a Data Science Kernel on an ml.t3.medium instance. To get started we need to first import the necessary libraries for the different steps we will be utilizing within SageMaker Pipelines.

import os
import boto3
import re
import time
import json
from sagemaker import get_execution_role, session
import pandas as pd

from time import gmtime, strftime
import sagemaker
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.estimator import Estimator

# Custom Lambda Step
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.pipeline import Pipeline

Next we create a Pipeline Session, this Pipeline Session ensures none of the training jobs are actually executed within the notebook until the Pipeline itself is executed.

pipeline_session = PipelineSession()

For this example we’ll utilize the Abalone dataset (CC BY 4.0) and run a SageMaker XGBoost algorithm on it for a regression model. You can download the dataset from the publicly available Amazon datasets.

!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .
!aws s3 cp abalone_dataset1_train.csv s3://{default_bucket}/xgboost-regression/train.csv
training_path = 's3://{}/xgboost-regression/train.csv'.format(default_bucket)

We can then parameterize our Pipeline by defining defaults for both the training dataset and instance type.

training_input_param = ParameterString(
name = "training_input",
default_value=training_path,
)

training_instance_param = ParameterString(
name = "training_instance",
default_value = "ml.c5.xlarge")

We then also retrieve the AWS provided container for XGBoost that we will be utilizing for training and inference.

model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'

image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type=training_instance_param,
)

image_uri

Training Setup

For the training portion of our Pipeline we will be configuring the SageMaker XGBoost algorithm for our regression Abalone dataset.

xgb_train_one = Estimator(
image_uri=image_uri,
instance_type=training_instance_param,
instance_count=1,
output_path=model_path,
sagemaker_session=pipeline_session,
role=role
)

xgb_train_one.set_hyperparameters(
objective="reg:linear",
num_round=40,
max_depth=4,
eta=0.1,
gamma=3,
min_child_weight=5,
subsample=0.6,
silent=0,
)

For our second estimator we then change our hyperparameters to adjust our model training so we have two separate models behind our Multi-Model Endpoint.

xgb_train_two = Estimator(
image_uri=image_uri,
instance_type=training_instance_param,
instance_count=1,
output_path=model_path,
sagemaker_session=pipeline_session,
role=role
)

#adjusting hyperparams
xgb_train_two.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
silent=0,
)

We then configure our training inputs for both estimators to point towards the parameter we defined for our S3 training dataset.

train_args_one = xgb_train_one.fit(
inputs={
"train": TrainingInput(
s3_data=training_input_param,
content_type="text/csv",
)
}
)

train_args_two = xgb_train_two.fit(
inputs={
"train": TrainingInput(
s3_data=training_input_param,
content_type="text/csv",
)
}
)

We then define two separate Training Steps that will be executed in parallel via our Pipeline.

step_train_one = TrainingStep(
name="TrainOne",
step_args=train_args_one,
)

step_train_two = TrainingStep(
name = "TrainTwo",
step_args= train_args_two
)

Lambda Step

A Lambda Step essentially allows you to plug in a Lambda function within your Pipeline. For every SageMaker Training Job a model.tar.gz is emitted containing the trained model artifacts. Here we will utilize the Lambda step to retrieve the trained model artifacts and deploy them to a SageMaker Multi-Model Endpoint.

Before we can do that we need to give our Lambda function proper permissions to work with SageMaker. We can use the following existing script to create an IAM Role for our Lambda Function.

import boto3
import json

iam = boto3.client("iam")

def create_lambda_role(role_name):
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
),
Description="Role for Lambda to call SageMaker functions",
)

role_arn = response["Role"]["Arn"]

response = iam.attach_role_policy(
RoleName=role_name,
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
)

response = iam.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", RoleName=role_name
)

return role_arn

except iam.exceptions.EntityAlreadyExistsException:
print(f"Using ARN from existing role: {role_name}")
response = iam.get_role(RoleName=role_name)
return response["Role"]["Arn"]

from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role")

After we’ve defined our Lambda role we can create a Lambda function that does a few things for us:

  • Takes each individual model.tar.gz from each training job and places it into a central S3 location containing both tarballs. For MME they expect all model tarballs to be in one singular S3 path.
  • Utilizes the boto3 client with SageMaker to create a SageMaker Model, Endpoint Configuration, and Endpoint.

We can utilize the following helper functions to achieve the first task, by copying the training job artifacts into a central S3 location with both model tarballs.

sm_client = boto3.client("sagemaker")
s3 = boto3.resource('s3')

def extract_bucket_key(model_data):
"""
Extracts the bucket and key from the model data tarballs that we are passing in
"""
bucket = model_data.split('/', 3)[2]
key = model_data.split('/', 3)[-1]
return [bucket, key]

def create_mme_dir(model_data_dir):
"""
Takes in a list of lists with the different trained models,
creates a central S3 bucket/key location with all model artifacts for MME.
"""
bucket_name = model_data_dir[0][0]
for i, model_data in enumerate(model_data_dir):
copy_source = {
'Bucket': bucket_name,
'Key': model_data[1]
}
bucket = s3.Bucket(bucket_name)
destination_key = 'xgboost-mme-pipelines/model-{}.tar.gz'.format(i)
bucket.copy(copy_source, destination_key)
mme_s3_path = 's3://{}/xgboost-mme-pipelines/'.format(bucket_name)
return mme_s3_path

The next steps for our Lambda function will be creating the necessary SageMaker entities for creating a real-time endpoint:

  • SageMaker Model: Contains the model data and container image, also defines Multi-Model vs Single Model endpoint.
  • SageMaker Endpoint Configuration: Defines the hardware behind an endpoint, the instance type and count.
  • SageMaker Endpoint: Your REST endpoint that you can invoke for inference, for MME you also specify the model that you want to perform inference against.
    model_name = 'mme-source' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_model_response = sm_client.create_model(
ModelName=model_name,
Containers=[
{
"Image": image_uri,
"Mode": "MultiModel",
"ModelDataUrl": model_url
}
],
#to-do parameterize this
ExecutionRoleArn='arn:aws:iam::474422712127:role/sagemaker-role-BYOC',
)
print("Model Arn: " + create_model_response["ModelArn"])

#Step 2: EPC Creation
xgboost_epc_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=xgboost_epc_name,
ProductionVariants=[
{
"VariantName": "xgbvariant",
"ModelName": model_name,
"InstanceType": "ml.c5.large",
"InitialInstanceCount": 1
},
],
)
print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])

#Step 3: EP Creation
endpoint_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=xgboost_epc_name,
)
print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])

We return a successful message with our Lambda function once we are able to start creating an endpoint.

return {
"statusCode": 200,
"body": json.dumps("Created Endpoint!"),
"endpoint_name": endpoint_name
}

We then define this Lambda function in the necessary Lambda Step format for our Pipeline to pick up on.

# Lambda helper class can be used to create the Lambda function
func = Lambda(
function_name=function_name,
execution_role_arn=lambda_role,
script="code/lambda_helper.py",
handler="lambda_helper.lambda_handler",
)

We also define what we are returning from the Lambda in the form of output parameters.

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="endpoint_name", output_type=LambdaOutputTypeEnum.String)

We then define our inputs with the two different trained model artifacts from the training steps that we defined earlier in our notebook.

step_deploy_lambda = LambdaStep(
name="LambdaStep",
lambda_func=func,
inputs={
"model_artifacts_one": step_train_one.properties.ModelArtifacts.S3ModelArtifacts,
"model_artifacts_two": step_train_two.properties.ModelArtifacts.S3ModelArtifacts
},
outputs=[output_param_1, output_param_2, output_param_3],
)

Pipeline Execution & Sample Inference

Now that we have our different steps configured we can stitch all of this together into a singular Pipeline. We point towards our three different steps and the different parameters we defined. Note that you can also define further parameters than we did here depending on your use case.

pipeline = Pipeline(
name="mme-pipeline",
steps=[step_train_one, step_train_two, step_deploy_lambda],
parameters= [training_input_param, training_instance_param]
)

We can now execute the Pipeline with the following commands.

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

Post execution we notice that in the Studio UI for the Pipelines tab a Directed Acylic Graph (DAG) has been created for your Pipeline to display your workflow.

MME DAG (Screenshot by Author)

After a few minutes you should also see an endpoint has been created in the SageMaker Console.

Endpoint Created (Screenshot by Author)

We can then test this endpoint with a sample inference to ensure it’s working properly.

import boto3
smr = boto3.client('sagemaker-runtime') #client for inference

#specify the tarball you are invoking in the TargetModel param
resp = smr.invoke_endpoint(EndpointName=endpoint_name, Body=b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0',
ContentType='text/csv', TargetModel = 'model-0.tar.gz')

print(resp['Body'].read())

Additional Resources & Conclusion

The code for the entire example can be found at the link above (stay tuned for more Pipelines examples). This example combines an advanced hosting option with MLOPs best practices. It’s crucial to utilize MLOPs tools as you scale up your ML experimentation as it helps simplify and parameterize your efforts so that it’s easier for teams to collaborate and track. I hope this article was a good overview of using Pipelines for a specific Hosting use-case in MME. As always all feedback is appreciated, thank you for reading!


Applying MLOps best practices to advanced serving options

Image from Unsplash by Growtika

MLOps is an essential practice to productionizing your Machine Learning workflows. With MLOps you can establish workflows that are catered for the ML lifecycle. These make it easier to centrally maintain resources, update/track models, and in general simplify the process as your ML experimentation scales up.

A key MLOps tool within the Amazon SageMaker ecosystem is SageMaker Pipelines. With SageMaker Pipelines you can define workflows that are composed of different defined ML steps. You can also structure these workflows by defining parameters that you can inject as variables into your Pipeline. For a more general introduction to SageMaker Pipelines, please refer to the linked article.

Defining a Pipeline in itself is not heavily complicated, but there’s a few advanced use-cases that need some extra configuring. Specifically, say that you are training multiple models that are needed for inference in your ML use-case. Within SageMaker there is a hosting option known as Multi-Model Endpoints (MME) where you can host several models on a singular endpoint and invoke a target model. However, within SageMaker Pipelines there’s no native support for defining or deploying a MME natively at the moment. In this blog post we’ll take a look at how we can utilize a Pipelines Lambda Step to deploy a Multi-Model Endpoint in a custom manner, while adhering to MLOPs best practices.

NOTE: For those of you new to AWS, make sure you make an account at the following link if you want to follow along. The article also assumes an intermediate understanding of SageMaker Deployment, I would suggest following this article for understanding Deployment/Inference more in depth. In particular, for SageMaker Multi-Model Endpoints I would refer to the following blog.

Setup

For this example, we will be working in SageMaker Studio, where we have access to the visual interfaces for SageMaker Pipelines and other SageMaker components. For development we will be utilizing a Studio Notebook Instance with a Data Science Kernel on an ml.t3.medium instance. To get started we need to first import the necessary libraries for the different steps we will be utilizing within SageMaker Pipelines.

import os
import boto3
import re
import time
import json
from sagemaker import get_execution_role, session
import pandas as pd

from time import gmtime, strftime
import sagemaker
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.estimator import Estimator

# Custom Lambda Step
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.pipeline import Pipeline

Next we create a Pipeline Session, this Pipeline Session ensures none of the training jobs are actually executed within the notebook until the Pipeline itself is executed.

pipeline_session = PipelineSession()

For this example we’ll utilize the Abalone dataset (CC BY 4.0) and run a SageMaker XGBoost algorithm on it for a regression model. You can download the dataset from the publicly available Amazon datasets.

!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .
!aws s3 cp abalone_dataset1_train.csv s3://{default_bucket}/xgboost-regression/train.csv
training_path = 's3://{}/xgboost-regression/train.csv'.format(default_bucket)

We can then parameterize our Pipeline by defining defaults for both the training dataset and instance type.

training_input_param = ParameterString(
name = "training_input",
default_value=training_path,
)

training_instance_param = ParameterString(
name = "training_instance",
default_value = "ml.c5.xlarge")

We then also retrieve the AWS provided container for XGBoost that we will be utilizing for training and inference.

model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'

image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type=training_instance_param,
)

image_uri

Training Setup

For the training portion of our Pipeline we will be configuring the SageMaker XGBoost algorithm for our regression Abalone dataset.

xgb_train_one = Estimator(
image_uri=image_uri,
instance_type=training_instance_param,
instance_count=1,
output_path=model_path,
sagemaker_session=pipeline_session,
role=role
)

xgb_train_one.set_hyperparameters(
objective="reg:linear",
num_round=40,
max_depth=4,
eta=0.1,
gamma=3,
min_child_weight=5,
subsample=0.6,
silent=0,
)

For our second estimator we then change our hyperparameters to adjust our model training so we have two separate models behind our Multi-Model Endpoint.

xgb_train_two = Estimator(
image_uri=image_uri,
instance_type=training_instance_param,
instance_count=1,
output_path=model_path,
sagemaker_session=pipeline_session,
role=role
)

#adjusting hyperparams
xgb_train_two.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
silent=0,
)

We then configure our training inputs for both estimators to point towards the parameter we defined for our S3 training dataset.

train_args_one = xgb_train_one.fit(
inputs={
"train": TrainingInput(
s3_data=training_input_param,
content_type="text/csv",
)
}
)

train_args_two = xgb_train_two.fit(
inputs={
"train": TrainingInput(
s3_data=training_input_param,
content_type="text/csv",
)
}
)

We then define two separate Training Steps that will be executed in parallel via our Pipeline.

step_train_one = TrainingStep(
name="TrainOne",
step_args=train_args_one,
)

step_train_two = TrainingStep(
name = "TrainTwo",
step_args= train_args_two
)

Lambda Step

A Lambda Step essentially allows you to plug in a Lambda function within your Pipeline. For every SageMaker Training Job a model.tar.gz is emitted containing the trained model artifacts. Here we will utilize the Lambda step to retrieve the trained model artifacts and deploy them to a SageMaker Multi-Model Endpoint.

Before we can do that we need to give our Lambda function proper permissions to work with SageMaker. We can use the following existing script to create an IAM Role for our Lambda Function.

import boto3
import json

iam = boto3.client("iam")

def create_lambda_role(role_name):
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
),
Description="Role for Lambda to call SageMaker functions",
)

role_arn = response["Role"]["Arn"]

response = iam.attach_role_policy(
RoleName=role_name,
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
)

response = iam.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", RoleName=role_name
)

return role_arn

except iam.exceptions.EntityAlreadyExistsException:
print(f"Using ARN from existing role: {role_name}")
response = iam.get_role(RoleName=role_name)
return response["Role"]["Arn"]

from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role")

After we’ve defined our Lambda role we can create a Lambda function that does a few things for us:

  • Takes each individual model.tar.gz from each training job and places it into a central S3 location containing both tarballs. For MME they expect all model tarballs to be in one singular S3 path.
  • Utilizes the boto3 client with SageMaker to create a SageMaker Model, Endpoint Configuration, and Endpoint.

We can utilize the following helper functions to achieve the first task, by copying the training job artifacts into a central S3 location with both model tarballs.

sm_client = boto3.client("sagemaker")
s3 = boto3.resource('s3')

def extract_bucket_key(model_data):
"""
Extracts the bucket and key from the model data tarballs that we are passing in
"""
bucket = model_data.split('/', 3)[2]
key = model_data.split('/', 3)[-1]
return [bucket, key]

def create_mme_dir(model_data_dir):
"""
Takes in a list of lists with the different trained models,
creates a central S3 bucket/key location with all model artifacts for MME.
"""
bucket_name = model_data_dir[0][0]
for i, model_data in enumerate(model_data_dir):
copy_source = {
'Bucket': bucket_name,
'Key': model_data[1]
}
bucket = s3.Bucket(bucket_name)
destination_key = 'xgboost-mme-pipelines/model-{}.tar.gz'.format(i)
bucket.copy(copy_source, destination_key)
mme_s3_path = 's3://{}/xgboost-mme-pipelines/'.format(bucket_name)
return mme_s3_path

The next steps for our Lambda function will be creating the necessary SageMaker entities for creating a real-time endpoint:

  • SageMaker Model: Contains the model data and container image, also defines Multi-Model vs Single Model endpoint.
  • SageMaker Endpoint Configuration: Defines the hardware behind an endpoint, the instance type and count.
  • SageMaker Endpoint: Your REST endpoint that you can invoke for inference, for MME you also specify the model that you want to perform inference against.
    model_name = 'mme-source' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_model_response = sm_client.create_model(
ModelName=model_name,
Containers=[
{
"Image": image_uri,
"Mode": "MultiModel",
"ModelDataUrl": model_url
}
],
#to-do parameterize this
ExecutionRoleArn='arn:aws:iam::474422712127:role/sagemaker-role-BYOC',
)
print("Model Arn: " + create_model_response["ModelArn"])

#Step 2: EPC Creation
xgboost_epc_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=xgboost_epc_name,
ProductionVariants=[
{
"VariantName": "xgbvariant",
"ModelName": model_name,
"InstanceType": "ml.c5.large",
"InitialInstanceCount": 1
},
],
)
print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])

#Step 3: EP Creation
endpoint_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=xgboost_epc_name,
)
print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])

We return a successful message with our Lambda function once we are able to start creating an endpoint.

return {
"statusCode": 200,
"body": json.dumps("Created Endpoint!"),
"endpoint_name": endpoint_name
}

We then define this Lambda function in the necessary Lambda Step format for our Pipeline to pick up on.

# Lambda helper class can be used to create the Lambda function
func = Lambda(
function_name=function_name,
execution_role_arn=lambda_role,
script="code/lambda_helper.py",
handler="lambda_helper.lambda_handler",
)

We also define what we are returning from the Lambda in the form of output parameters.

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="endpoint_name", output_type=LambdaOutputTypeEnum.String)

We then define our inputs with the two different trained model artifacts from the training steps that we defined earlier in our notebook.

step_deploy_lambda = LambdaStep(
name="LambdaStep",
lambda_func=func,
inputs={
"model_artifacts_one": step_train_one.properties.ModelArtifacts.S3ModelArtifacts,
"model_artifacts_two": step_train_two.properties.ModelArtifacts.S3ModelArtifacts
},
outputs=[output_param_1, output_param_2, output_param_3],
)

Pipeline Execution & Sample Inference

Now that we have our different steps configured we can stitch all of this together into a singular Pipeline. We point towards our three different steps and the different parameters we defined. Note that you can also define further parameters than we did here depending on your use case.

pipeline = Pipeline(
name="mme-pipeline",
steps=[step_train_one, step_train_two, step_deploy_lambda],
parameters= [training_input_param, training_instance_param]
)

We can now execute the Pipeline with the following commands.

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

Post execution we notice that in the Studio UI for the Pipelines tab a Directed Acylic Graph (DAG) has been created for your Pipeline to display your workflow.

MME DAG (Screenshot by Author)

After a few minutes you should also see an endpoint has been created in the SageMaker Console.

Endpoint Created (Screenshot by Author)

We can then test this endpoint with a sample inference to ensure it’s working properly.

import boto3
smr = boto3.client('sagemaker-runtime') #client for inference

#specify the tarball you are invoking in the TargetModel param
resp = smr.invoke_endpoint(EndpointName=endpoint_name, Body=b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0',
ContentType='text/csv', TargetModel = 'model-0.tar.gz')

print(resp['Body'].read())

Additional Resources & Conclusion

The code for the entire example can be found at the link above (stay tuned for more Pipelines examples). This example combines an advanced hosting option with MLOPs best practices. It’s crucial to utilize MLOPs tools as you scale up your ML experimentation as it helps simplify and parameterize your efforts so that it’s easier for teams to collaborate and track. I hope this article was a good overview of using Pipelines for a specific Hosting use-case in MME. As always all feedback is appreciated, thank you for reading!

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment