趋近智
一个实用的集成式流程展示了 DVC 和 MLflow 如何共同工作,以管理数据、追踪实验,并自动化流程执行,从而提升可复现性。
我们将创建一个简单的机器学习流程,包含两个阶段:数据准备和模型训练。DVC 将管理数据工件并调度流程,而 MLflow 则会追踪训练阶段产生的参数、指标和模型。
请确保您已安装并配置以下内容:
pip install dvc[s3] 或等效命令)pip install mlflow scikit-learn pandas)git init, git add, git commit)。首先,让我们设置项目目录结构。
为项目创建一个新目录并进入该目录:
mkdir integrated-pipeline-example
cd integrated-pipeline-example
初始化 Git 和 DVC:
git init
dvc init
这将创建 .git 和 .dvc 目录。请记住提交初始 DVC 配置文件:
git add .dvc .dvcignore
git commit -m "Initialize DVC"
创建必要的子目录:
mkdir data src models data/raw data/processed
为代码和参数创建占位文件:
touch src/prepare.py src/train.py params.yaml requirements.txt
添加一个简单的原始数据集。对于本例,创建一个虚拟 CSV 文件 data/raw/data.csv:
feature1,feature2,target
1.0,2.1,0
1.5,2.5,0
1.8,2.9,0
3.2,4.5,1
3.5,5.1,1
4.0,6.0,1
0.8,1.9,0
3.8,5.5,1
将所有项目结构文件添加到 Git:
git add data/raw/data.csv src/ params.yaml requirements.txt models/ .gitignore
# (如果需要,请创建一个 .gitignore 文件,例如添加 __pycache__/, *.pyc 等)
git commit -m "Initial project structure and raw data"
此阶段将获取原始数据,执行简单的准备步骤(如分割),并保存处理后的数据。DVC 将管理这些处理后的数据文件。
编辑 src/prepare.py:
# src/prepare.py
import pandas as pd
from sklearn.model_selection import train_test_split
import os
import yaml
# 确保处理后的目录存在
os.makedirs('data/processed', exist_ok=True)
# 加载参数
with open('params.yaml', 'r') as f:
params = yaml.safe_load(f)
split_ratio = params['prepare']['split']
seed = params['base']['seed']
# 加载原始数据
raw_data_path = 'data/raw/data.csv'
df = pd.read_csv(raw_data_path)
# 分割数据
train_df, test_df = train_test_split(df, test_size=split_ratio, random_state=seed)
# 保存处理后的数据
train_output_path = 'data/processed/train.csv'
test_output_path = 'data/processed/test.csv'
train_df.to_csv(train_output_path, index=False)
test_df.to_csv(test_output_path, index=False)
print(f"处理后的数据已保存:")
print(f"- 训练集: {train_output_path}")
print(f"- 测试集: {test_output_path}")
编辑 params.yaml:定义用于准备阶段和即将到来的训练阶段的参数。
# params.yaml
base:
seed: 42
prepare:
split: 0.3 # 测试集比例
train:
model_type: LogisticRegression
solver: 'liblinear' # LogisticRegression 的示例参数
C: 1.0 # 正则化强度
定义 DVC 阶段:使用 dvc stage add 在 dvc.yaml 中定义此准备步骤。此命令告诉 DVC 如何运行脚本、其依赖项是什么以及它产生哪些输出。
dvc stage add -n prepare \
-p base.seed,prepare.split \
-d src/prepare.py -d data/raw/data.csv \
-o data/processed/train.csv -o data/processed/test.csv \
python src/prepare.py
-n prepare:将阶段命名为“prepare”。-p base.seed,prepare.split:声明此阶段使用的来自 params.yaml 的参数。DVC 将追踪这些特定参数的更改。-d src/prepare.py -d data/raw/data.csv:指定脚本和数据依赖项。如果这些发生更改,则需要重新运行此阶段。-o data/processed/train.csv -o data/processed/test.csv:声明此阶段产生的输出。DVC 将开始追踪这些文件。python src/prepare.py:此阶段要执行的命令。提交更改:dvc stage add 命令会创建/更新 dvc.yaml 和 dvc.lock,并将输出文件 (data/processed/*.csv) 添加到 DVC 追踪(为它们创建 .dvc 文件)。将这些更改提交到 Git。
git add dvc.yaml dvc.lock data/processed/.gitignore src/prepare.py params.yaml
git commit -m "Add DVC stage: prepare data"
注意:DVC 会自动将输出路径 (data/processed/train.csv, data/processed/test.csv) 添加到 .gitignore(在 data/processed/.gitignore 中),这样 Git 就会忽略大型数据文件本身。
如果您已配置远程存储,可以选择将 DVC 追踪的数据推送到远程存储 (dvc push)。
此阶段使用处理后的数据训练模型,使用 MLflow 记录实验,并保存模型和指标。DVC 管理依赖项(处理后的数据、脚本、参数)和输出(模型文件、指标文件)。
编辑 src/train.py:此脚本现在包含了 MLflow 日志记录功能。
# src/train.py
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import mlflow
import mlflow.sklearn
import os
import yaml
import pickle
import json
# 加载参数
with open('params.yaml', 'r') as f:
params = yaml.safe_load(f)
# --- MLflow 设置 ---
# 可选:如果使用远程服务器,设置追踪 URI
# mlflow.set_tracking_uri("http://...")
mlflow.set_experiment("Simple Classification")
# 启动 MLflow 运行
with mlflow.start_run():
seed = params['base']['seed']
model_params = params['train']
# --- 记录参数 ---
mlflow.log_param("seed", seed)
mlflow.log_params(model_params) # 记录所有训练参数
# --- 加载数据 ---
train_data_path = 'data/processed/train.csv'
test_data_path = 'data/processed/test.csv'
train_df = pd.read_csv(train_data_path)
test_df = pd.read_csv(test_data_path)
X_train = train_df[['feature1', 'feature2']]
y_train = train_df['target']
X_test = test_df[['feature1', 'feature2']]
y_test = test_df['target']
# --- 记录 DVC 数据信息(示例) ---
# 将输入数据的路径或哈希值记录为标签
# 这需要解析 dvc.lock 或使用 dvc api,此处已简化
mlflow.set_tag("train_data_path", train_data_path)
mlflow.set_tag("test_data_path", test_data_path)
# 一种方法是获取 dvc.lock 中的哈希值
# --- 训练模型 ---
# 使用 params.yaml 中定义的逻辑回归模型
if model_params['model_type'] == 'LogisticRegression':
model = LogisticRegression(
solver=model_params['solver'],
C=model_params['C'],
random_state=seed
)
else:
raise ValueError(f"Unsupported model type: {model_params['model_type']}")
model.fit(X_train, y_train)
# --- 评估模型 ---
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
# --- 记录指标 ---
mlflow.log_metric("accuracy", accuracy)
print(f"模型准确率: {accuracy:.4f}")
# --- 保存和记录模型 ---
os.makedirs('models', exist_ok=True)
model_output_path = 'models/model.pkl'
with open(model_output_path, 'wb') as f:
pickle.dump(model, f)
# 使用 MLflow 的 scikit-learn 集成记录模型
mlflow.sklearn.log_model(model, "sklearn-model")
print(f"模型已保存到: {model_output_path}")
print(f"模型已记录到 MLflow 运行: {mlflow.active_run().info.run_id}")
# --- 保存指标文件(用于 DVC 追踪) ---
metrics_output_path = 'metrics.json'
metrics_data = {'accuracy': accuracy}
with open(metrics_output_path, 'w') as f:
json.dump(metrics_data, f, indent=4)
print(f"指标已保存到: {metrics_output_path}")
print("MLflow 运行完成。")
定义 DVC 阶段:将训练阶段添加到 dvc.yaml。它依赖于“prepare”阶段的输出、训练脚本和相关参数。它生成模型文件和指标文件。
dvc stage add -n train \
-p base.seed,train \
-d src/train.py -d data/processed/train.csv -d data/processed/test.csv \
-o models/model.pkl \
-m metrics.json \
python src/train.py
-n train:将阶段命名为“train”。-p base.seed,train:追踪 general seed 和 params.yaml 中 train 部分下的所有参数。-d ...:指定依赖项:脚本 (src/train.py) 和上一阶段的输出 (data/processed/*.csv)。-o models/model.pkl:将模型文件声明为 DVC 追踪的输出。-m metrics.json:将 metrics.json 声明为指标文件。DVC 可以从此类文件解析和显示指标。python src/train.py:要执行的命令。提交更改:提交更新后的 dvc.yaml、dvc.lock、新的训练脚本、DVC 追踪的模型占位符和指标文件定义。
git add dvc.yaml dvc.lock src/train.py models/.gitignore metrics.json
# DVC 创建 models/.gitignore 来使 Git 忽略实际的模型文件
git commit -m "Add DVC stage: train model with MLflow logging"
现在我们已在 dvc.yaml 中定义了一个两阶段流程。
执行流程:使用 dvc repro 从头到尾运行整个流程。DVC 会检查依赖项并按需执行阶段。
dvc repro
您将看到来自 prepare.py 和 train.py 的输出,包括 MLflow 日志消息和最终准确率。DVC 将首先运行“prepare”,然后运行“train”。
检查状态:
git status:应显示 dvc.lock 和 metrics.json 可能已更改(因为流程已运行)。提交这些更新。
git add dvc.lock metrics.json
git commit -m "Run integrated pipeline"
dvc status:应显示流程已是最新状态。dvc metrics show:显示从 metrics.json 追踪的指标。检查 MLflow 用户界面:启动 MLflow 用户界面以查看追踪到的实验运行:
mlflow ui
在浏览器中导航到 http://localhost:5000(或您配置的地址)。找到“Simple Classification”实验。您应该看到一个记录了以下内容的运行:
seed、model_type、solver、C。train_data_path、test_data_path。accuracy。sklearn-model(包括 model.pkl、conda.yaml、python_env.yaml、requirements.txt)。该图表显示了 Git、DVC 追踪的工件/阶段以及 MLflow 实验追踪在项目结构中的关系。
现在,让我们看看当我们进行更改时,这种集成设置的能力。
修改参数:编辑 params.yaml 并更改一个训练参数,例如正则化强度 C:
# params.yaml
base:
seed: 42
prepare:
split: 0.3
train:
model_type: LogisticRegression
solver: 'liblinear'
C: 0.1 # 从 1.0 更改为 0.1
复现:再次运行 dvc repro。
dvc repro
注意 DVC 检测到 params.yaml 中影响“train”阶段的更改。它会跳过“prepare”阶段(因为其依赖项没有改变),只重新运行“train”阶段。训练脚本再次执行,以 C=0.1 的参数向 MLflow 记录一个新的运行。
验证:
dvc.lock 和 metrics.json 提交到 Git。
git add dvc.lock metrics.json params.yaml
git commit -m "Experiment: Change C to 0.1"
C) 的差异以及产生的 accuracy。这个动手实践说明了 DVC 流程如何根据依赖项(代码、数据、参数)的变化自动化执行流程,同时 MLflow 捕获每次执行的具体信息(参数、指标、工件)。通过将 DVC 元数据(dvc.yaml、dvc.lock)与您的代码和参数一起提交到 Git,您就创建了一个完全版本化和可复现的机器学习流程。任何有权访问您的 Git 仓库和 DVC 远程存储的人,都可以检出特定的提交,并复现您的精确流程和结果。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造