趋近智
虽然 Kubernetes 提供了运行容器的基础能力,但管理多步骤的机器学习过程需要更结构化的方法。一个典型的机器学习项目并非一个独立的应用程序;它是一个包含不同阶段的工作流程,例如数据摄取、预处理、模型训练、评估和部署。使用 kubectl 命令手动编排这个序列会很复杂且容易出错。这正是 Kubeflow 发挥作用的地方。
Kubeflow 是一个专为 Kubernetes 设计的开源机器学习工具集。它不替代 Kubernetes。相反,它在 Kubernetes 之上构建,提供了一系列工具,简化了大规模部署、监控和管理机器学习系统的过程。Kubeflow 的核心目标是让 Kubernetes 上的机器学习工作流实现可组合、可移植和可扩展。
编排这些工作流的核心功能是 Kubeflow Pipelines。它提供了一个框架和用户界面,用于构建和部署可复用的机器学习管道。
一个 Kubeflow Pipeline 是一个由容器化任务构成的有向无环图(DAG)。图中的每个任务都是一个独立的组件。让我们细致了解这些元素。
组件是管道的基本构建单元。它是一个独立的、容器化的应用程序,在您的工作流中执行一个单一的步骤。可以将组件视为一个具有强类型输入和输出的函数。例如,您可以有以下用途的组件:
因为每个组件都是一个容器,所以它封装了自身的代码和依赖项。这意味着数据预处理组件可以使用与模型训练组件不同的库集,甚至不同的 Python 版本,从而确保关注点清晰分离。
管道通过连接组件来定义机器学习工作流的结构。您定义一个组件的输出如何成为另一个组件的输入,从而创建一个依赖关系图。例如,处理后的数据路径(您的 preprocess 组件的输出)作为输入参数传递给您的 train 组件。
这种图结构使 Kubeflow 能够管理执行顺序,确保一个步骤只有在其所有依赖项都满足后才运行。它还允许独立步骤的并行执行,提高了整体效率。
一个典型的机器学习管道,定义为组件的图。实线表示直接依赖,而虚线可以表示条件步骤,例如仅当模型评估得分达到特定阈值时才部署模型。
Kubeflow Pipelines 最有效的特性之一是其 Python SDK (kfp)。它允许您使用熟悉的 Python 代码定义组件和管道,然后将其编译为 Kubernetes 可以理解的静态 YAML 配置。
让我们来看一个简化的例子,说明如何定义一个两步管道。
首先,您定义您的组件。@dsl.component 装饰器将 Python 函数转换为可复用的管道组件。您指定其依赖项并定义其输入和输出。
from kfp import dsl
from kfp.compiler import Compiler
# 组件 1: 预处理数据
@dsl.component(
base_image='python:3.9',
packages_to_install=['pandas==1.3.5']
)
def preprocess_data(
raw_data_path: str,
processed_data: dsl.OutputPath('Dataset')
):
"""加载原始数据,进行清洗,并保存到输出路径。"""
import pandas as pd
df = pd.read_csv(raw_data_path)
# Perform cleaning operations
df_cleaned = df.dropna()
df_cleaned.to_csv(processed_data, index=False)
print("数据预处理完成。")
# 组件 2: 训练模型
@dsl.component(
base_image='tensorflow/tensorflow:2.8.0', # 使用不同的镜像
)
def train_model(
dataset: dsl.InputPath('Dataset'),
model_output: dsl.Output[dsl.Model]
):
"""加载处理后的数据并训练一个简单模型。"""
import pandas as pd
# 训练逻辑的占位符
# 在实际场景中,您会加载数据并训练一个模型
df = pd.read_csv(dataset)
print(f"正在使用来自 {dataset} 的数据训练模型...")
# 保存一个示例模型文件
with open(model_output.path, 'w') as f:
f.write("这是一个训练好的模型工件。")
print(f"模型已保存到 {model_output.path}")
接下来,您使用 @dsl.pipeline 装饰器定义管道本身。在此函数内部,您实例化您的组件,并通过将一个任务的输出作为输入传递给另一个任务来将它们连接起来。
# 定义管道结构
@dsl.pipeline(
name='简单训练管道',
description='一个演示管道,用于预处理数据和训练模型。'
)
def my_first_pipeline(data_url: str):
# 实例化第一个任务
preprocess_task = preprocess_data(raw_data_path=data_url)
# 第二个任务使用第一个任务的输出
train_task = train_model(
dataset=preprocess_task.outputs['processed_data']
)
# 您还可以为特定步骤指定资源请求
train_task.set_cpu_limit('2').set_memory_limit('4G').add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
在此示例中,train_task 依赖于 preprocess_task,因为它使用了后者的输出。Kubeflow 自动处理此依赖关系。注意训练步骤如何配置为请求特定资源,包括 GPU。这使您能够仅将强大且昂贵的硬件分配给需要它的步骤,从而优化资源利用率和成本。
将 Kubeflow 整合到您的 MLOps 技术栈中,带来多项重要优势:
通过抽象底层 Kubernetes 对象,Kubeflow Pipelines 让数据科学家和机器学习工程师能够专注于用 Python 定义他们的工作流逻辑,而平台则处理调度、执行和资源管理等复杂工作。这使其成为为机器学习项目带来操作规范的有效工具。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造