本实践环节引导你构建一个基础的双智能体系统,智能体之间通过既定协议进行通信以完成一个共同目的。我们将侧重于消息传递和结构化的机制,这建立在本章前面所谈论的准则之上。我们的目标是创建包含两个智能体的系统:一个ContentIdeaGeneratorAgent(内容想法生成智能体)和一个DraftWriterAgent(草稿编写智能体)。ContentIdeaGeneratorAgent 将为一个短博客文章提议一个主题。它将把这个主题发送给 DraftWriterAgent。DraftWriterAgent 将收到主题,使用大型语言模型(LLM)就该主题生成一个短草稿(几段),然后将此草稿发送回 ContentIdeaGeneratorAgent。本练习将巩固你对基于消息的通信、智能体角色以及在多智能体环境中大型语言模型(LLM)基本整合的理解。设计通信协议在编写任何代码之前,定义我们的智能体将如何相互通信很重要。清晰的协议确保消息被正确理解。我们将使用JSON作为消息格式,因为它简单且使用广泛。消息将具有以下结构:{ "sender_id": "string", // 发送消息的智能体ID "receiver_id": "string", // 预期接收智能体的ID "message_id": "string", // 消息的唯一ID "session_id": "string", // 对话或任务会话的ID "message_type": "string", // 消息类型(例如,“IDEA_PROPOSAL”,“DRAFT_RESPONSE”) "payload": { // 消息内容 // ... 根据消息类型确定的具体内容 } }对于我们的场景,我们将需要两种主要消息类型:IDEA_PROPOSAL(想法提议):从 ContentIdeaGeneratorAgent 发送给 DraftWriterAgent。payload:{"topic": "建议的主题字符串"}DRAFT_RESPONSE(草稿响应):从 DraftWriterAgent 发送给 ContentIdeaGeneratorAgent。payload:{"topic": "原始主题字符串", "draft": "生成的草稿字符串"}我们包含原始主题以提供上下文,这是一个好的做法。这是说明交互流程的图示:digraph TwoAgentCommunication { rankdir=TB; node [shape=Mrecord, style="filled", fillcolor="#e9ecef", color="#495057", fontname="Arial"]; edge [fontname="Arial", color="#495057"]; Generator [label="{内容想法生成智能体 | 提议主题}", fillcolor="#a5d8ff"]; Writer [label="{草稿编写智能体 | 编写草稿}", fillcolor="#96f2d7"]; Generator -> Writer [label="消息\n类型: IDEA_PROPOSAL\n内容: {主题: ...}"]; Writer -> Generator [label="消息\n类型: DRAFT_RESPONSE\n内容: {主题: ..., 草稿: ...}"]; }ContentIdeaGeneratorAgent 通过向 DraftWriterAgent 发送 IDEA_PROPOSAL 消息来启动通信。DraftWriterAgent 处理此请求并回复一个包含生成内容的 DRAFT_RESPONSE 消息。设置你的环境确保你已安装Python 3.8+。对于本次练习,我们将使用OpenAI API来获取大型语言模型(LLM)能力。确保你已安装 openai 库(pip install openai)并且你的OpenAI API密钥已配置为环境变量(OPENAI_API_KEY)。我们将为我们的智能体定义简单的类。在更复杂的系统中,你可能会使用多智能体框架,但为了学习通信基础,直接实现非常具有启发性。实现智能体让我们首先创建一个用于与大型语言模型(LLM)交互的辅助函数。这会将API调用抽象化。import os import uuid import json from openai import OpenAI # 初始化OpenAI客户端 # 确保已设置 OPENAI_API_KEY 环境变量 client = OpenAI() def generate_llm_response(prompt_text, model="gpt-3.5-turbo", max_tokens=200): """ 根据提示生成大型语言模型(LLM)的响应。 """ try: response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt_text}], max_tokens=max_tokens, temperature=0.7 ) return response.choices[0].message.content.strip() except Exception as e: print(f"Error calling LLM: {e}") return "Error: Could not generate LLM response." # 对于本例,我们将使用一个简单的内存消息队列。 # 在实际系统中,这会是一个消息代理,例如RabbitMQ或Kafka。 message_queue = [] def send_message(sender_id, receiver_id, message_type, payload, session_id): """ 通过将消息添加到队列来模拟发送。 """ message = { "sender_id": sender_id, "receiver_id": receiver_id, "message_id": str(uuid.uuid4()), "session_id": session_id, "message_type": message_type, "payload": payload } print(f"\n[MESSAGE SENT] From: {sender_id} To: {receiver_id} Type: {message_type}") print(f"Payload: {json.dumps(payload, indent=2)}") message_queue.append(message) def receive_message(agent_id): """ 模拟智能体检查其邮箱(我们的队列)。 返回为该智能体准备的第一条消息,如果没有则返回None。 """ for i, msg in enumerate(message_queue): if msg["receiver_id"] == agent_id: message = message_queue.pop(i) print(f"\n[MESSAGE RECEIVED] By: {agent_id} From: {message['sender_id']} Type: {message['message_type']}") print(f"Payload: {json.dumps(message['payload'], indent=2)}") return message return None ContentIdeaGeneratorAgent此智能体将启动此过程。class ContentIdeaGeneratorAgent: def __init__(self, agent_id="IdeaGenerator_001"): self.agent_id = agent_id self.current_session_id = None def propose_idea(self, idea_prompt="Suggest a compelling topic for a short tech blog post aimed at developers."): """ 使用大型语言模型(LLM)生成一个主题想法,并将其发送给 DraftWriterAgent。 """ print(f"\n[{self.agent_id}] Generating content idea...") topic = generate_llm_response(idea_prompt, max_tokens=50) if "Error:" in topic: print(f"[{self.agent_id}] Failed to generate topic: {topic}") return print(f"[{self.agent_id}] Generated Topic: {topic}") self.current_session_id = str(uuid.uuid4()) # 启动一个新会话 payload = {"topic": topic} send_message( sender_id=self.agent_id, receiver_id="DraftWriter_001", # 目标智能体ID message_type="IDEA_PROPOSAL", payload=payload, session_id=self.current_session_id ) def handle_response(self, message): """ 处理来自 DraftWriterAgent 的草稿响应。 """ if message and message["message_type"] == "DRAFT_RESPONSE": if message["session_id"] == self.current_session_id: print(f"\n[{self.agent_id}] Received draft for topic: '{message['payload']['topic']}'") print(f"[{self.agent_id}] Draft Content:\n{message['payload']['draft']}") self.current_session_id = None # 结束会话 else: print(f"[{self.agent_id}] Received response for an old or unknown session: {message['session_id']}") else: print(f"[{self.agent_id}] Received an unexpected message or no message.") DraftWriterAgent此智能体等待一个主题,然后编写一份草稿。class DraftWriterAgent: def __init__(self, agent_id="DraftWriter_001"): self.agent_id = agent_id def process_idea_proposal(self, message): """ 接收一个主题,生成一份草稿,并将其发送回去。 """ if message and message["message_type"] == "IDEA_PROPOSAL": topic = message["payload"]["topic"] session_id = message["session_id"] print(f"\n[{self.agent_id}] Received topic: '{topic}'. Generating draft...") draft_prompt = f"Write a short, engaging blog post draft (2-3 paragraphs) on the topic: '{topic}'. The tone should be informative and slightly informal." draft = generate_llm_response(draft_prompt, max_tokens=300) if "Error:" in draft: print(f"[{self.agent_id}] Failed to generate draft for topic '{topic}': {draft}") # (可选)发送错误消息回去 payload = {"topic": topic, "error": "Failed to generate draft"} send_message( sender_id=self.agent_id, receiver_id=message["sender_id"], message_type="DRAFT_ERROR", # 一种新的错误消息类型 payload=payload, session_id=session_id ) return response_payload = {"topic": topic, "draft": draft} send_message( sender_id=self.agent_id, receiver_id=message["sender_id"], message_type="DRAFT_RESPONSE", payload=response_payload, session_id=session_id ) else: print(f"[{self.agent_id}] Received an unexpected message type or no message.") def listen_for_work(self): """ 智能体检查消息的主循环。 在实际应用中,这会是事件驱动的,或者使用适当的消息队列监听器。 """ print(f"\n[{self.agent_id}] Listening for incoming tasks...") message = receive_message(self.agent_id) if message: self.process_idea_proposal(message) return True # 表明工作已完成 return False # 没有处理任何消息 模拟交互现在,让我们实例化这些智能体并运行模拟。def run_simulation(): # 实例化智能体 idea_agent = ContentIdeaGeneratorAgent() writer_agent = DraftWriterAgent() print("--- 启动双智能体通信模拟 ---") # 1. IdeaGeneratorAgent 提议一个想法 idea_agent.propose_idea("The future of serverless computing with WebAssembly.") # 2. DraftWriterAgent 监听工作并处理该想法 # 在实际系统中,智能体会运行在独立的线程/进程中并持续监听。 # 在这里,我们通过显式调用 listen_for_work 来模拟。 work_done = writer_agent.listen_for_work() if not work_done: print("DraftWriterAgent 立即没有发现任何工作。") # 在真实场景中,它会持续监听。对于本演示,这表示消息未被接收。 # 3. IdeaGeneratorAgent 监听响应 # 我们需要确保 idea_agent 也检查其“邮箱” response_message = receive_message(idea_agent.agent_id) if response_message: idea_agent.handle_response(response_message) else: print(f"IdeaGeneratorAgent 立即没有发现任何响应。") print("\n--- 模拟结束 ---") if __name__ == "__main__": # 确保 OPENAI_API_KEY 在你的环境中已设置 if not os.getenv("OPENAI_API_KEY"): print("Error: OPENAI_API_KEY environment variable not set.") print("Please set it before running the script.") else: run_simulation()运行示例并观察输出要运行此程序:将所有Python代码(辅助函数、智能体类和模拟逻辑)保存到一个单独的.py文件中(例如,two_agent_comm.py)。设置你的 OPENAI_API_KEY 环境变量。从你的终端执行脚本:python two_agent_comm.py你应该会看到类似的输出(大型语言模型(LLM)的具体响应会有所不同):--- 启动双智能体通信模拟 --- [IdeaGenerator_001] 正在生成内容想法... [IdeaGenerator_001] 生成的主题:Exploring: Serverless Functions and WebAssembly for Edge Computing [消息发送] 从: IdeaGenerator_001 到: DraftWriter_001 类型: IDEA_PROPOSAL 有效载荷: { "topic": "Exploring: Serverless Functions and WebAssembly for Edge Computing" } [DraftWriter_001] 正在监听传入任务... [消息接收] 由: DraftWriter_001 从: IdeaGenerator_001 类型: IDEA_PROPOSAL 有效载荷: { "topic": "Exploring: Serverless Functions and WebAssembly for Edge Computing" } [DraftWriter_001] 收到主题:“Exploring: Serverless Functions and WebAssembly for Edge Computing”。正在生成草稿... [消息发送] 从: DraftWriter_001 到: IdeaGenerator_001 类型: DRAFT_RESPONSE 有效载荷: { "topic": "Exploring: Serverless Functions and WebAssembly for Edge Computing", "draft": "The worlds of serverless computing and WebAssembly (Wasm) are on a collision course, and the impact, particularly at the edge, is set to be significant. Serverless functions have long promised reduced operational overhead and scalable compute, but often come with cold start trade-offs and language limitations. WebAssembly, a binary instruction format for a stack-based virtual machine, offers near-native performance, a compact size, and language flexibility, making it an ideal companion for serverless architectures.\n\nImagine deploying highly efficient, sandboxed Wasm modules as your serverless functions. These modules can be written in languages like Rust, C++, or Go, compiled to Wasm, and then executed swiftly at edge locations closer to your users. This approach not only addresses cold start issues due to Wasm's quick instantiation times but also enhances security through its well-defined sandboxing model. As edge computing demands more responsive and resource-conscious applications, the combination of serverless approaches with WebAssembly's performance and portability presents a compelling future for developers building next-generation distributed systems." } [消息接收] 由: IdeaGenerator_001 从: DraftWriter_001 类型: DRAFT_RESPONSE 有效载荷: { "topic": "Exploring: Serverless Functions and WebAssembly for Edge Computing", "draft": "The worlds of serverless computing and WebAssembly (Wasm) are on a collision course, and the impact, particularly at the edge, is set to be significant. Serverless functions have long promised reduced operational overhead and scalable compute, but often come with cold start trade-offs and language limitations. WebAssembly, a binary instruction format for a stack-based virtual machine, offers near-native performance, a compact size, and language flexibility, making it an ideal companion for serverless architectures.\n\nImagine deploying highly efficient, sandboxed Wasm modules as your serverless functions. These modules can be written in languages like Rust, C++, or Go, compiled to Wasm, and then executed swiftly at edge locations closer to your users. This approach not only addresses cold start issues due to Wasm's quick instantiation times but also enhances security through its well-defined sandboxing model. As edge computing demands more responsive and resource-conscious applications, the combination of serverless approaches with WebAssembly's performance and portability presents a compelling future for developers building next-generation distributed systems." } [IdeaGenerator_001] 收到主题:“Exploring: Serverless Functions and WebAssembly for Edge Computing”的草稿 [IdeaGenerator_001] 草稿内容: The worlds of serverless computing and WebAssembly (Wasm) are on a collision course, and the impact, particularly at the edge, is poised to be significant. Serverless functions have long promised reduced operational overhead and scalable compute, but often come with cold start trade-offs and language limitations. WebAssembly, a binary instruction format for a stack-based virtual machine, offers near-native performance, a compact size, and language flexibility, making it an ideal companion for serverless architectures. Imagine deploying highly efficient, sandboxed Wasm modules as your serverless functions. These modules can be written in languages like Rust, C++, or Go, compiled to Wasm, and then executed swiftly at edge locations closer to your users. This approach not only addresses cold start issues due to Wasm's quick instantiation times but also enhances security through its well-defined sandboxing model. As edge computing demands more responsive and resource-conscious applications, the combination of serverless approaches with WebAssembly's performance and portability presents a compelling future for developers building next-generation distributed systems. --- 模拟结束 ---讨论与进一步扩展本动手练习演示了两个大型语言模型(LLM)驱动的智能体之间基本但完整的通信循环。值得关注的要点包括:协议的重要性:定义良好的消息结构(sender_id、receiver_id、message_type、payload、session_id)对可靠的智能体间通信而言是基础。智能体专业化:每个智能体都有明确的角色,使系统更易于理解和管理。会话管理:使用 session_id 有助于关联请求和响应,这在处理多个并发交互的系统中很重要。模拟通信通道:我们使用一个简单的列表(message_queue)作为消息总线。在生产系统中,你会采用消息队列技术(例如RabbitMQ、Kafka、Redis Streams)或专门的P2P通信库来处理消息传递、持久化和扩展。若要扩展此示例,请思考:异步通信:修改智能体使其独立运行,或许在独立的线程中或使用Python的 asyncio 库,以真实地模拟并发操作。错误处理:实现更精密的错误处理。如果 DraftWriterAgent 在多次尝试后仍无法生成草稿怎么办?它可以发送一个特定的 DRAFT_FAILED 消息。添加第三个智能体:引入一个 ReviewerAgent(审阅智能体),它从 DraftWriterAgent 接收草稿,提供反馈(或许使用另一次大型语言模型(LLM)调用),然后将其发送给 ContentIdeaGeneratorAgent 或返回给 DraftWriterAgent 进行修订。这将引入一个更复杂的流程。不同的通信模式:研究广播消息(一个智能体发送给多个)或订阅模型。通过构建和试用这个简单的系统,你已经迈出了重要一步,以理解多智能体大型语言模型(LLM)系统中智能体间通信的实用性。这里应用的准则为更复杂和有能力的智能体协作奠定了基础。