Overview: Streamlining MLOps with SageMaker Pipelines, Model Registry, and A/B Deployment
In the rapidly evolving landscape of artificial intelligence, the journey from model development to production deployment is often fraught with complexities. Machine Learning Operations (MLOps) aims to bridge this gap, bringing DevOps principles to the ML lifecycle to ensure reproducibility, reliability, and scalability. AWS SageMaker offers a comprehensive suite of tools that are instrumental in achieving robust MLOps. Among these, SageMaker Pipelines stand out as a powerful orchestration service, allowing data scientists and engineers to build, automate, and manage end-to-end machine learning workflows. Coupled with the SageMaker Model Registry for versioning and governance, and the flexible A/B deployment capabilities for controlled rollouts, SageMaker provides a formidable platform for advanced MLOps.
This article delves into how these core SageMaker components integrate to create a seamless MLOps workflow. We'll explore a practical scenario where a machine learning model is trained, evaluated, and conditionally registered using SageMaker Pipelines. Subsequently, we will demonstrate how to deploy different model versions from the Model Registry to an A/B test endpoint, enabling controlled experimentation and performance monitoring in a production environment. This approach allows teams to iterate quickly, maintain high model quality, and ensure business continuity by minimizing risks associated with new model deployments.
Prerequisites
Before embarking on this journey, ensure you have the following prerequisites in place:
- An active AWS account with appropriate administrative permissions to create IAM roles, S3 buckets, and SageMaker resources.
- AWS CLI installed and configured with credentials for your AWS account.
- Python 3.8+ installed on your local machine or development environment (e.g., a SageMaker Notebook instance).
- The `boto3` and `sagemaker` SDKs installed:
pip install boto3 sagemaker==2.x --upgrade
- Familiarity with basic AWS concepts (S3, IAM, EC2) and SageMaker fundamentals.
Step-by-step Implementation
We'll walk through the process of setting up our environment, defining our ML pipeline, and deploying models for A/B testing.
1. Setup and IAM Roles
First, we need to set up an S3 bucket for storing our data and model artifacts, and an IAM role that SageMaker can assume to access various AWS services.
Create S3 Bucket
This bucket will serve as the central repository for our dataset, processing scripts, training scripts, and model artifacts.
aws s3 mb s3://tech-news-venture-mlops-bucket-12345 --region us-east-1
Create IAM Role for SageMaker
This role needs permissions for SageMaker to perform operations like launching instances, accessing S3, interacting with ECR (for Docker images), and logging to CloudWatch. We will create an execution role for SageMaker and attach the necessary policies.
# 1. Create a trust policy JSON file (e.g., sagemaker-trust-policy.json)
cat > sagemaker-trust-policy.json << EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sagemaker.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
# 2. Create the IAM role
aws iam create-role \
--role-name sagemaker-mlops-execution-role \
--assume-role-policy-document file://sagemaker-trust-policy.json
# 3. Attach required policies
# SageMaker full access (for simplicity in this example, but use least privilege in production)
aws iam attach-role-policy \
--role-name sagemaker-mlops-execution-role \
--policy-arn arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
# S3 read/write access to our specific bucket
# First, create a custom policy JSON (e.g., s3-custom-policy.json)
ACCOUNT_ID="123456789012" # Replace with your AWS Account ID
S3_BUCKET_NAME="tech-news-venture-mlops-bucket-12345"
cat > s3-custom-policy.json << EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::${S3_BUCKET_NAME}/*",
"arn:aws:s3:::${S3_BUCKET_NAME}"
]
}
]
}
EOF
# Create the custom policy
aws iam create-policy \
--policy-name TechNewsVentureS3AccessPolicy \
--policy-document file://s3-custom-policy.json
# Attach the custom policy to the role
aws iam attach-role-policy \
--role-name sagemaker-mlops-execution-role \
--policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/TechNewsVentureS3AccessPolicy
# Store the ARN for later use
SAGEMAKER_ROLE_ARN="arn:aws:iam::${ACCOUNT_ID}:role/sagemaker-mlops-execution-role"
echo "SageMaker Role ARN: $SAGEMAKER_ROLE_ARN"
In a real-world production scenario, you would adhere to the principle of least privilege, creating more granular policies instead of `AmazonSageMakerFullAccess`.
2. Data Preparation with SageMaker Processing
Data preparation is a crucial first step. We'll use a SageMaker Processing job to preprocess our raw data. For this example, let's assume we have a simple CSV dataset that needs feature engineering and splitting into train/test sets.
Dummy Data and Processing Script
First, let's create a dummy dataset and a Python script for processing. We'll simulate a simple classification problem.
# data.csv (example content)
# feature1,feature2,target
# 10,20,0
# 15,25,1
# ...
# processing.py
import argparse
import os
import pandas as pd
from sklearn.model_selection import train_test_split
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input-data-path", type=str, default="/opt/ml/processing/input/data.csv")
parser.add_argument("--output-train-path", type=str, default="/opt/ml/processing/train")
parser.add_argument("--output-test-path", type=str, default="/opt/ml/processing/test")
args = parser.parse_args()
print(f"Reading input data from {args.input_data_path}")
df = pd.read_csv(args.input_data_path)
# Simulate some preprocessing (e.g., feature scaling, one-hot encoding)
# For simplicity, let's just split
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
train_df = pd.concat([X_train, y_train], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)
os.makedirs(args.output_train_path, exist_ok=True)
os.makedirs(args.output_test_path, exist_ok=True)
train_output_path = os.path.join(args.output_train_path, "train.csv")
test_output_path = os.path.join(args.output_test_path, "test.csv")
print(f"Saving training data to {train_output_path}")
train_df.to_csv(train_output_path, index=False)
print(f"Saving testing data to {test_output_path}")
test_df.to_csv(test_output_path, index=False)
print("Data processing complete.")
Upload these to your S3 bucket:
# Create a dummy data.csv file locally
echo "feature1,feature2,target" > data.csv
echo "10,20,0" >> data.csv
echo "15,25,1" >> data.csv
echo "12,22,0" >> data.csv
echo "18,28,1" >> data.csv
echo "11,21,0" >> data.csv
echo "16,26,1" >> data.csv
echo "13,23,0" >> data.csv
echo "19,29,1" >> data.csv
aws s3 cp data.csv s3://tech-news-venture-mlops-bucket-12345/raw-data/data.csv
aws s3 cp processing.py s3://tech-news-venture-mlops-bucket-12345/scripts/processing.py
Define Processing Step in Pipeline
import sagemaker
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
# Initialize SageMaker session and defaults
session = sagemaker.Session()
bucket = "tech-news-venture-mlops-bucket-12345" # Replace with your bucket name
role = "arn:aws:iam::123456789012:role/sagemaker-mlops-execution-role" # Replace with your role ARN
region = session.boto_session.region_name
# Define pipeline parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
input_data_uri = ParameterString(
name="InputDataUri",
default_value=f"s3://{bucket}/raw-data/data.csv"
)
# Define a ScriptProcessor for data preprocessing
script_processor = ScriptProcessor(
image_uri=sagemaker.image_uris.get_sklearn_image_uri(region=region, version="1.0-1"),
command=["python3"],
instance_type=processing_instance_type,
instance_count=processing_instance_count,
role=role,
sagemaker_session=session
)
# Define the ProcessingStep
processing_step_args = script_processor.run(
outputs=[
ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train", destination=f"s3://{bucket}/processed-data/train"),
ProcessingOutput(output_name="test_data", source="/opt/ml/processing/test", destination=f"s3://{bucket}/processed-data/test")
],
code=f"s3://{bucket}/scripts/processing.py",
inputs=[
ProcessingInput(source=input_data_uri, destination="/opt/ml/processing/input")
]
)
step_process_data = ProcessingStep(
name="ProcessData",
step_args=processing_step_args
)
print("Processing step defined.")
3. Model Training with SageMaker Training Job
Next, we'll train a model using the processed data. We'll use a simple scikit-learn classifier for demonstration.
Training Script
# train.py
import argparse
import os
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--train-data-path", type=str, default="/opt/ml/input/data/train/train.csv")
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
args = parser.parse_args()
print(f"Reading training data from {args.train_data_path}")
train_df = pd.read_csv(args.train_data_path)
X_train = train_df.drop("target", axis=1)
y_train = train_df["target"]
print("Training RandomForestClassifier...")
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
print("Training complete.")
model_output_path = os.path.join(args.model_dir, "model.joblib")
print(f"Saving model to {model_output_path}")
joblib.dump(model, model_output_path)
print("Model saved.")
Upload the training script:
aws s3 cp train.py s3://tech-news-venture-mlops-bucket-12345/scripts/train.py
Define Training Step in Pipeline
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
# Define training parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
# Define an SKLearn estimator for training
sklearn_estimator = SKLearn(
entry_point="train.py",
source_dir=f"s3://{bucket}/scripts",
role=role,
instance_count=1,
instance_type=training_instance_type,
framework_version="1.0-1",
py_version="py3",
sagemaker_session=session,
hyperparameters={"n_estimators": 100} # Example hyperparameter
)
# Define the TrainingStep
step_train_model = TrainingStep(
name="TrainModel",
estimator=sklearn_estimator,
inputs={
"train": TrainingInput(
s3_data=step_process_data.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
content_type="text/csv"
)
}
)
print("Training step defined.")
4. Model Evaluation and Registration
After training, we evaluate the model's performance on the test set and, if it meets a predefined threshold, register it into the SageMaker Model Registry.
Evaluation Script
# evaluate.py
import argparse
import os
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score
import json
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model-path", type=str, default="/opt/ml/processing/model/model.joblib")
parser.add_argument("--test-data-path", type=str, default="/opt/ml/processing/test/test.csv")
parser.add_argument("--output-path", type=str, default="/opt/ml/processing/evaluation")
args = parser.parse_args()
print(f"Loading model from {args.model_path}")
model = joblib.load(args.model_path)
print(f"Reading test data from {args.test_data_path}")
test_df = pd.read_csv(args.test_data_path)
X_test = test_df.drop("target", axis=1)
y_test = test_df["target"]
print("Making predictions...")
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions, average="binary") # Adjust average for multi-class
print(f"Model Accuracy: {accuracy}")
print(f"Model F1 Score: {f1}")
evaluation_report = {
"classification_metrics": {
"accuracy": {"value": accuracy, "standard_deviation": "NaN"},
"f1_score": {"value": f1, "standard_deviation": "NaN"}
}
}
os.makedirs(args.output_path, exist_ok=True)
evaluation_file_path = os.path.join(args.output_path, "evaluation.json")
print(f"Saving evaluation report to {evaluation_file_path}")
with open(evaluation_file_path, "w") as f:
json.dump(evaluation_report, f)
print("Evaluation complete.")
Upload the evaluation script:
aws s3 cp evaluate.py s3://tech-news-venture-mlops-bucket-12345/scripts/evaluate.py
Define Evaluation and Registration Step in Pipeline
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_group import ModelPackageGroup
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import Processor
# Define pipeline session
pipeline_session = PipelineSession(sagemaker_session=session)
# Define evaluation instance parameters
evaluation_instance_type = ParameterString(name="EvaluationInstanceType", default_value="ml.m5.xlarge")
# Define a ScriptProcessor for model evaluation
evaluation_processor = ScriptProcessor(
image_uri=sagemaker.image_uris.get_sklearn_image_uri(region=region, version="1.0-1"),
command=["python3"],
instance_type=evaluation_instance_type,
instance_count=1,
role=role,
sagemaker_session=pipeline_session # Use pipeline_session
)
# Define the property file for evaluation metrics
evaluation_report = PropertyFile(
name="EvaluationReport",
output_name="evaluation",
path="evaluation.json"
)
# Define the ProcessingStep for evaluation
step_evaluate_model = ProcessingStep(
name="EvaluateModel",
processor=evaluation_processor,
inputs=[
ProcessingInput(
source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model"
),
ProcessingInput(
source=step_process_data.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
destination="/opt/ml/processing/test"
)
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
],
code=f"s3://{bucket}/scripts/evaluate.py",
property_files=[evaluation_report]
)
print("Evaluation step defined.")
# Define the ModelPackageGroup
model_package_group_name = "TechNewsVentureModelGroup"
model_package_group_input = ModelPackageGroup(
name=model_package_group_name,
role=role,
sagemaker_session=session,
model_package_group_description="Model package group for TechNews Venture classification models."
)
print(f"Model Package Group '{model_package_group_name}' defined.")
# Define the Model
model = Model(
image_uri=sagemaker.image_uris.get_sklearn_image_uri(region=region, version="1.0-1"),
model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role
)
# Define the RegisterModel step
min_accuracy_threshold = ParameterString(name="MinAccuracyThreshold", default_value="0.75")
# Conditional registration based on evaluation metrics
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
step_register_model = ModelStep(
name="RegisterModel",
step_args=model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status="PendingManualApproval", # Can be "Approved" for auto-deployment
model_metrics=sagemaker.model_metrics.ModelMetrics(
model_statistics=sagemaker.model_metrics.MetricsSource(
s3_uri=JsonGet(
step_name=step_evaluate_model.name,
json_path="classification_metrics"
),
content_type="application/json"
)
)
)
)
# Define the ConditionStep
condition_step = ConditionStep(
name="CheckModelAccuracy",
conditions=[
ConditionGreaterThanOrEqualTo(
left=JsonGet(
step_name=step_evaluate_model.name,
json_path="classification_metrics.accuracy.value"
),
right=min_accuracy_threshold
)
],
if_steps=[step_register_model],
else_steps=[] # Or define steps for failure, e.g., send notification
)
print("Model registration and conditional evaluation steps defined.")
5. Defining and Executing the SageMaker Pipeline
Now, we assemble all the steps into a SageMaker Pipeline and execute it.
from sagemaker.workflow.pipeline import Pipeline
# Define the pipeline
pipeline_name = "TechNewsVentureMLOpsPipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
input_data_uri,
processing_instance_type,
processing_instance_count,
training_instance_type,
evaluation_instance_type,
min_accuracy_threshold
],
steps=[step_process_data, step_train_model, step_evaluate_model, condition_step],
sagemaker_session=pipeline_session # Use pipeline_session
)
# Upsert (create or update) the pipeline
pipeline.upsert(role_arn=role)
print(f"Pipeline '{pipeline_name}' upserted.")
# Start a pipeline execution
execution = pipeline.start(
parameters={
"InputDataUri": f"s3://{bucket}/raw-data/data.csv",
"MinAccuracyThreshold": "0.70" # Lowering threshold for example
}
)
print(f"Pipeline execution started: {execution.arn}")
# Wait for the execution to complete (optional, for script-based execution)
# execution.wait()
# print(f"Pipeline execution status: {execution.describe()['PipelineExecutionStatus']}")
Once the pipeline completes, if the accuracy threshold is met, a new model version will be registered in the Model Registry with `PendingManualApproval` status. You can manually approve it in the SageMaker console under "Model groups" -> "TechNewsVentureModelGroup".
6. Model Deployment and A/B Testing
After a model version is approved in the Model Registry, we can deploy it to a SageMaker endpoint. For A/B testing, we'll deploy two different model versions (or the same model with different configurations) to the same endpoint with weighted traffic distribution.
First, ensure you have at least two approved model versions in your `TechNewsVentureModelGroup`. You might need to run the pipeline twice, perhaps with a slightly modified `train.py` or different hyperparameters, to get two distinct models registered and approved.
import sagemaker
from sagemaker.model import ModelPackage
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
import boto3
import time
session = sagemaker.Session()
sm_client = boto3.client("sagemaker", region_name=region)
role = "arn:aws:iam::123456789012:role/sagemaker-mlops-execution-role" # Your SageMaker execution role ARN
model_package_group_name = "TechNewsVentureModelGroup"
endpoint_name = "TechNewsVentureABTestEndpoint"
# --- Retrieve Approved Model Package ARNs ---
# In a real scenario, you'd have logic to select specific versions.
# For this example, let's assume we know two approved versions.
# You can find these ARNs in the SageMaker console under Model groups.
# Example: Get the latest two approved models from the Model Registry
def get_approved_model_packages(model_package_group_name, num_models=2):
response = sm_client.list_model_packages(
ModelPackageGroupName=model_package_group_name,
ModelPackageStatus="Completed",
SortBy="CreationTime",
SortOrder="Descending"
)
approved_models = []
for mp in response["ModelPackageSummaryList"]:
# Filter for Approved or InProgress if you have auto-approval
if mp["ModelApprovalStatus"] == "Approved":
approved_models.append(mp["ModelPackageArn"])
if len(approved_models) >= num_models:
break
return approved_models
model_arns = get_approved_model_packages(model_package_group_name, num_models=2)
if len(model_arns) < 2:
print("Warning: Less than 2 approved model versions found. Please ensure your pipeline ran successfully and models are approved in the Model Registry.")
# For demonstration, we'll proceed with one model if only one is available, or raise an error.
if not model_arns:
raise Exception("No approved model packages found to deploy.")
model_arns.append(model_arns[0]) # Use the same model twice if only one exists for A/B setup
print(f"Found approved model ARNs: {model_arns}")
# Create ModelPackage objects for deployment
model_v1 = ModelPackage(
role=role,
model_package_arn=model_arns[0],
sagemaker_session=session
)
model_v2 = ModelPackage(
role=role,
model_package_arn=model_arns[1],
sagemaker_session=session
)
# --- Define Endpoint Configuration for A/B Test ---