除了上篇提到的 Data Wrangler 外,SageMaker 還有許多配套的功能,其中有個叫做 Pipelines 的東西,說是可以用來構建、 管理及自動化深度學習流程,能夠節省人工操作,有那麼神?這次就來試試 Pipelines 能夠為我們帶來什麼體驗。
SageMaker Pipelines 提供了一個框架,讓你定義和自動化機器學習工作流程中的所有步驟(Step),包括資料預處理、特徵工程、模型訓練、模型評估和模型部署。 每個步驟可以由不同的SageMaker元件組成。
Step 為 Pipeline 中的基本單位,每個 Step 表示一個特定的任務或操作 :
接下來便簡易搭建一個 Pipeline 流程 :
1.準備環境
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.estimator import Estimator
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_context import PipelineSession
# SageMaker 會話和腳色
sagemaker_session = sagemaker.Session()
role = 'your-iam-role'
bucket = 'your-s3-bucket'
prefix = 'your-data-prefix'
# Pipeline 會話
pipeline_session = PipelineSession()
2.定義參數
# 定義數據前處理用執行個體
processing_instance_type = ParameterString(name="ProcessingInstanceType",
default_value="ml.m5.xlarge")
# 定義訓練用執行個體
training_instance_type = ParameterString(name="TrainingInstanceType",
default_value="ml.p3.2xlarge")
3.定義數據前處理step
# 定義數據處理流程
processor = ScriptProcessor(
role=role,
image_uri='your-processing-container',
command=['python3'],
instance_count=1,
instance_type=processing_instance_type,
sagemaker_session=sagemaker_session
)
# 定義 Step
step_process = ProcessingStep(
name='DataProcessing',
processor=processor,
inputs=[sagemaker.processing.ProcessingInput(
source=f's3://{bucket}/{prefix}/raw-data',
destination='/opt/ml/processing/input'
)],
outputs=[sagemaker.processing.ProcessingOutput(
source='/opt/ml/processing/output',
destination=f's3://{bucket}/{prefix}/processed-data'
)],
code='processing_script.py'
)
4.定義訓練Step
# 定義訓練流程
estimator = PyTorch(
entry_point='train.py',
role=role,
instance_count=1,
instance_type=training_instance_type,
framework_version='1.8.0',
py_version='py37',
script_mode=True,
output_path='s3://your-bucket/your-prefix/model',
sagemaker_session=sagemaker_session
)
# 定義訓練Step
step_train = TrainingStep(
name='ModelTraining',
estimator=estimator,
inputs={'train': 's3://your-bucket/your-prefix/processed-data'}
)
5.定義模型部屬Step
model = Model(
image_uri='your-inference-container',
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
role=role
)
step_model = ModelStep(
name='ModelDeployment',
model=model,
instance_count=1,
instance_type='ml.m5.large'
)
6.定義及創建Pipeline
# 定义Pipeline
pipeline = Pipeline(
name='MyPipeline',
parameters=[processing_instance_type, training_instance_type],
steps=[step_process, step_train, step_model]
)
# 创建和启动Pipeline
pipeline.upsert(role=role)
pipeline.start()
以上程式碼定義了三個 Step,分別包含數據前處理、模型訓練、模型部屬,並交由Pipeline 去順序執行,從示例可以看到,我們可以針對不同 Step 指定不同的執行個體,這意味著我們可以根據不同 Step 的運算需求使用最適合的環境,並且因為分離成不同的 Step,在維護及管理上便可以視為單一獨立的區塊去處理,在工作細化及分工上都可以帶來幫助。
SageMaker Pipeline 將整個訓練流程細分成多個 Step ,雖然增加了分工及管理上的優點,但前提是有著良好的分工狀態,若是專案不夠龐大,需要的處理不夠複雜,還是將所有流程寫在同一份程式碼內會相對好處理很多。
所以統整下大概符合下列條件再建構 Pipeline 才會有比較好的體驗 :