趋近智
TFX 流水线由 ExampleGen、Transform 和 Trainer 等各个组件构成。每个组件都在机器学习 (machine learning)工作流程中执行特定步骤。然而,定义这些组件及其依赖关系只是其中一部分。要在生产环境中将这些组件作为协调一致、自动化且可靠的系统运行,需要一个编排器。
编排器是负责调度、执行和监控流水线内任务(TFX 组件)的系统。它理解您在组件之间定义的依赖关系,并确保它们按正确顺序运行。如果某个组件失败,编排器可以处理重试或通知您问题。它管理流水线运行的整体生命周期。
可以将 TFX 组件视为复杂食谱中的单独指令。编排器就是厨师,负责阅读食谱,收集食材(数据制品),按顺序执行每个步骤(运行组件),并管理厨房(计算资源)。
虽然 TFX 提供了 LocalDagRunner 用于在本地环境中直接运行流水线(适用于开发和测试),但生产环境需要更精细的管理。编排器提供:
TFX 被设计为与编排器无关,这意味着您可以在各种平台上运行 TFX 流水线。TFX 流水线的核心定义大体保持不变,但您将使用特定的 Runner 类或配置来指定所选的编排器。开源社区中两个受欢迎的选择是 Apache Airflow 和 Kubeflow Pipelines。
Apache Airflow 是一个广泛采用的通用工作流编排器。Airflow 中的流水线使用 Python 定义为有向无环图 (DAG)。TFX 提供将基于 Python 的 TFX 流水线定义编译成 Airflow DAG 的工具。
tfx.orchestration.airflow.AirflowDagRunner 将您的 TFX 流水线编译成 Airflow DAG 文件。Kubeflow Pipelines 专为在 Kubernetes 上编排机器学习 (machine learning)工作流而设计。它使用容器执行每个流水线步骤,提供出色的隔离和环境管理。
tfx.orchestration.kubeflow.KubeflowDagRunner(或针对新 KFP v2 API 的 KubeflowV2DagRunner)将您的流水线编译成 YAML 文件,该文件可以上传并在 Kubeflow Pipelines 部署上运行。tfx.orchestration.LocalDagRunner 在您的本地 Python 环境中顺序执行组件。仅适用于开发或小型实验。无论使用哪种编排器,您都可以通过实例化组件并连接它们的输入和输出来在 Python 中定义 TFX 流水线。
# 示例代码段
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator, Transform, Trainer
from tfx.proto import example_gen_pb2
from tfx.orchestration import pipeline
# 假设已定义了必要的输入,例如 data_path, module_file
# 1. 实例化组件
example_gen = CsvExampleGen(input_base=data_path)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
transform = Transform(examples=example_gen.outputs['examples'], schema=schema_gen.outputs['schema'], module_file=transform_module_file)
trainer = Trainer(module_file=trainer_module_file, examples=transform.outputs['transformed_examples'], transform_graph=transform.outputs['transform_graph'], schema=schema_gen.outputs['schema'])
# ... 添加 Evaluator, Pusher 等
# 2. 定义组件列表
components = [
example_gen,
statistics_gen,
schema_gen,
example_validator,
transform,
trainer,
# ...
]
# 3. 创建流水线定义
pipeline_definition = pipeline.Pipeline(
pipeline_name='my_tfx_pipeline',
pipeline_root='/path/to/pipeline/artifacts', # 由编排器管理
components=components,
enable_cache=True,
metadata_connection_config=None # 配置 ML 元数据连接
# beam_pipeline_args 与 Dataflow/Spark 运行器相关
)
# 4. 为特定编排器编译(在单独的脚本/CLI 步骤中)
# 示例编译命令:
# tfx pipeline compile --engine kubeflow --pipeline_path my_pipeline_definition.py --output_path my_pipeline.yaml
# 或在 Python 中使用运行器:
# from tfx.orchestration.kubeflow import KubeflowDagRunner
# KubeflowDagRunner().run(pipeline_definition)
Python 代码定义了结构和逻辑。编译步骤将此定义转换为所选编排器所需的特定格式(例如,Airflow DAG Python 文件、KFP YAML 文件)。编排器随后获取此编译后的定义并执行它。
TFX 流水线的一个典型依赖关系图,由编排器管理。组件在其输入(制品以标签形式表示)可用后运行。
选择编排器取决于几个因素:
通过使用编排器,您可以将 TFX 组件定义转换为一个可管理、自动化且可用于生产的机器学习系统。这使您能够专注于改进模型和数据,而编排器则处理一致且可靠地运行流水线的操作复杂性。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造