趋近智
将机器学习模型部署到最终用户或下游应用程序,需要通过服务基础设施使其可用。这通常发生在模型使用 TorchScript、量化或剪枝等技术优化推理性能之后。手动构建和管理此类服务基础设施可能很复杂,涉及 API 开发、请求处理、扩缩容和监控。TorchServe 是一个专门为简化 PyTorch 模型这一过程而开发的工具。它提供了一种标准化的方式,用于在生产环境中打包、部署、管理和提供训练好的模型。
TorchServe 充当了桥梁,连接优化后的 PyTorch 模型与需要使用其预测结果的应用程序。它处理模型服务的操作方面,使您能够专注于模型开发和集成。
TorchServe 在设计时充分考虑了灵活性和性能。其主要组件共同协作以提供一个方案:
模型归档器 (torch-model-archiver): 这个命令行工具是为 TorchServe 准备模型的第一步。它将所有必需的工件打包成一个独立的归档文件,扩展名为 .mar。这些工件通常包括:
.pt 文件或标准的 state_dict)。TorchServe 运行时: 这是核心服务器进程。它侦听预定义网络端口上的传入请求。它管理已部署模型的生命周期,包括将它们加载到内存中、按模型扩展工作进程数量,以及将推理请求路由到适当的工作进程。
处理器: 处理器是 Python 脚本或类,它们定义 TorchServe 如何与您的特定模型交互。它们封装了以下逻辑:
initialize(context): 模型加载时调用一次。用于将模型加载到内存和进行一次性设置。preprocess(data): 将传入请求数据转换为模型 forward 方法期望的格式(例如,解码图像、文本分词)。inference(model_input): 通过调用模型的预测函数来执行实际推理。postprocess(inference_output): 将模型的原始输出转换为用户友好的格式(例如,将类别索引映射到标签,格式化 JSON)。
TorchServe 提供了用于常见任务(图像分类、对象检测、文本分类)的多个内置处理器,但您可以轻松地为特殊的模型输入/输出或复杂的工作流创建自定义处理器。API: TorchServe 公开了两个主要 REST API:
gRPC 支持也可用,用于低延迟通信,在微服务架构中尤其有用。
使用 TorchServe 部署模型通常遵循以下步骤:
torch.jit.save() 保存 TorchScript 模型或使用 torch.save() 保存 state_dict),并收集所有必需的辅助文件。.py 文件)。torch-model-archiver 将模型、处理器和其他文件打包成一个 .mar 归档。.mar 文件将位于或注册到该目录。.mar 文件加载您的模型并准备好服务。您可以指定初始工作进程数量。TorchServe 部署工作流程的高级概述,描绘了准备步骤和运行时请求处理。
torch-model-archiver 工具对于模型打包非常核心。以下是典型的命令结构:
torch-model-archiver \
--model-name my_transformer_model \
--version 1.0 \
--serialized-file traced_transformer.pt \
--handler transformer_handler.py \
--extra-files "vocab.txt,config.json" \
--export-path /path/to/model-store \
--force
让我们分析一下这些参数:
--model-name: 在 API 调用中使用的模型逻辑名称(例如,my_transformer_model)。--version: 模型的版本字符串(例如,1.0)。TorchServe 可以管理同一模型的多个版本。--serialized-file: 已保存模型文件的路径(例如,torch.jit.save 的输出)。如果使用 state_dict,您还需要 --model-file 指向您的模型定义 Python 文件。--handler: 您的处理器脚本(.py)的路径。这可以是内置处理器之一(例如,image_classifier,text_classifier),也可以是您的自定义脚本。--extra-files: 模型或处理器所需的额外文件的逗号分隔列表(例如,分词器配置、词汇文件、标签映射)。这些文件将通过上下文对象在处理器的 initialize 方法中访问。--export-path: 生成的 .mar 文件将保存到的目录。这通常设置为 TorchServe 模型存储目录。--force: 如果 .mar 文件已存在,则覆盖它。执行此命令将创建 /path/to/model-store/my_transformer_model.mar。
虽然 TorchServe 的内置处理器涵盖了许多常见用例,但您通常会需要自定义逻辑。自定义处理器是一个 Python 脚本,其中包含一个类(通常继承自 BaseHandler),它实现了以下部分或全部方法:
# custom_handler.py
import torch
import json
from ts.torch_handler.base_handler import BaseHandler
import logging
import os
logger = logging.getLogger(__name__)
class MyCustomHandler(BaseHandler):
"""
自定义处理器,用于处理特定输入/输出格式。
"""
def __init__(self):
super().__init__()
self.initialized = False
self.model = None
self.mapping = None
def initialize(self, context):
"""
加载模型和额外文件。模型加载时调用一次。
"""
self.manifest = context.manifest
properties = context.system_properties
model_dir = properties.get("model_dir") # 包含解压后的 MAR 内容的目录
# 确定设备
self.device = torch.device("cuda:" + str(properties.get("gpu_id")) if torch.cuda.is_available() and properties.get("gpu_id") is not None else "cpu")
logger.info(f"处理器在设备上初始化: {self.device}")
# 加载模型(假设为 TorchScript 的示例)
serialized_file = self.manifest['model']['serializedFile']
model_pt_path = os.path.join(model_dir, serialized_file)
if not os.path.isfile(model_pt_path):
raise RuntimeError("缺少 model.pt 文件")
self.model = torch.jit.load(model_pt_path, map_location=self.device)
self.model.eval()
logger.info(f"模型 {self.manifest['model']['modelName']} 加载成功。")
# 加载额外文件(例如,标签映射)
mapping_file_path = os.path.join(model_dir, "index_to_name.json") # 假设通过 --extra-files 传入
if os.path.isfile(mapping_file_path):
with open(mapping_file_path) as f:
self.mapping = json.load(f)
logger.info("标签映射加载成功。")
else:
logger.warning("映射文件未找到。")
self.initialized = True
def preprocess(self, data):
"""
将原始输入数据转换为模型输入张量。
'data' 是一个字典列表,每个字典包含原始请求数据。
"""
# 示例:假设输入是 JSON,如 {'text': 'some input string'}
# 这在很大程度上取决于您的具体应用程序
processed_inputs = []
for row in data:
request_body = row.get("data") or row.get("body") # 处理不同的输入源
if isinstance(request_body, (bytes, bytearray)):
request_body = request_body.decode('utf-8')
# 在此处添加您的特定预处理逻辑(分词、张量创建等)
# 为简单起见,我们假设 request_body 是所需的直接输入
# 实际上,您会进行文本分词、图像解码/大小调整等操作。
# 这部分必须返回 self.model() 期望格式的数据。
logger.info(f"收到输入: {request_body}")
# 虚拟预处理:直接传递(替换为真实逻辑)
processed_inputs.append(request_body)
# 示例:如果模型需要,将处理后的输入转换为批处理张量
# input_tensor = self.tokenizer(processed_inputs, return_tensors="pt", padding=True, truncation=True).to(self.device)
# return input_tensor
return processed_inputs # 返回列表作为虚拟示例
def inference(self, model_input):
"""
使用模型运行推理。
'model_input' 是 preprocess() 的输出。
"""
# 示例:运行已加载的 TorchScript 模型
# 输出取决于您的模型结构
# 确保 model_input 在正确的设备上
# output = self.model(model_input.to(self.device))
# 虚拟推理:只回显输入(替换为真实模型调用)
logger.info(f"正在对: {model_input} 运行虚拟推理")
with torch.no_grad(): # 对推理来说必不可少
# 将此替换为:output = self.model(model_input)
output = [f"Processed: {item}" for item in model_input]
return output
def postprocess(self, inference_output):
"""
将模型输出转换为用户友好的格式。
'inference_output' 是 inference() 的输出。
返回预测结果列表,每个输入请求一个。
"""
# 示例:将原始模型输出(例如,logits)转换为标签/分数
# predictions = torch.softmax(inference_output, dim=1).argmax(dim=1).cpu().tolist()
# result = [self.mapping[str(pred)] if self.mapping else str(pred) for pred in predictions]
# 虚拟后处理:只返回推理输出
logger.info(f"后处理: {inference_output}")
return inference_output # 应该是一个列表
# 注意:处理器文件名必须与 --handler 参数值匹配(不带 .py)
# 如果处理器类名与首字母大写的文件名不同,
# 则在 MANIFEST.json 中或通过归档器指定。
此示例显示了基本结构。您将用实际的预处理(例如,图像转换、文本分词)、模型调用和后处理(例如,将输出索引映射到类别名称、格式化 JSON 响应)来替换虚拟逻辑。context 对象提供对系统属性(如 GPU 可用性)、模型目录和清单详情的访问。
一旦您的 .mar 文件在模型存储中,您就可以启动 TorchServe:
# 如果模型存储目录不存在,则创建它
mkdir /path/to/model-store
# 启动 TorchServe,指向模型存储
torchserve --start \
--model-store /path/to/model-store \
--models my_model=/path/to/model-store/my_transformer_model.mar \
--ts-config /path/to/config.properties
--start: 在后台启动 TorchServe 服务器。使用 torchserve --stop 停止它。--model-store: 指定包含 .mar 文件的目录。--models:(可选)在启动时预加载并注册特定模型。格式为 model_name=model_archive.mar,如果文件在模型存储中,也可以是简单的 model_name.mar。您可以指定多个模型。--ts-config:(可选)用于自定义端口、JVM 参数、日志等的配置文件(.properties)路径。启动后,您将与 API 交互,通常使用 curl 或 Python 中的 requests 等客户端库。
注册模型(管理 API):
curl -X POST "http://localhost:8081/models?url=my_transformer_model.mar&model_name=transformer&initial_workers=1&synchronous=true"
此命令告知 TorchServe 从其 .mar 文件加载 my_transformer_model.mar(假设它在模型存储中),将其注册为逻辑名称 transformer,为其启动 1 个工作进程,并等待注册完成。
发送推理请求(推理 API):
具体格式取决于您的处理器的 preprocess 方法。如果它期望原始图像字节:
curl -X POST http://localhost:8080/predictions/transformer -T image.jpg
如果它期望 JSON:
curl -X POST http://localhost:8080/predictions/transformer -H "Content-Type: application/json" -d '{"text": "This is an example sentence."}'
响应将包含来自您的处理器的 postprocess 方法的输出。
扩缩工作进程(管理 API):
如果您需要特定模型的更高吞吐量,您可以增加工作进程的数量:
curl -X PUT "http://localhost:8081/models/transformer?min_worker=4"
这会将 transformer 模型的工作进程数量扩缩到 4(TorchServe 处理它们之间的负载均衡)。
检查状态和指标:
curl http://localhost:8081/modelscurl http://localhost:8081/models/transformercurl http://localhost:8082/metricsTorchServe 专为生产负载而设计。有助于提高性能的特点包括:
preprocess 和 inference 中实现批处理逻辑,以同时处理多个请求,从而提高 GPU 利用率。TorchServe 还具有可通过管理 API 或配置文件配置的内置动态批处理能力。通过使用 TorchServe,您能显著减少部署和管理 PyTorch 模型所需的工程工作量,且确保可靠高效。它提供了一个标准化、功能丰富的平台,与 PyTorch 生态系统和常见的 MLOps 实践良好集成,使其成为将您的先进模型投入生产的重要工具。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造