This practical session guides you through building a foundational two-agent system where agents communicate using a defined protocol to accomplish a shared objective. We'll focus on the mechanics of message passing and structuring, building upon the principles discussed earlier in this chapter.
Our goal is to create a system with two agents: a ContentIdeaGeneratorAgent
and a DraftWriterAgent
.
ContentIdeaGeneratorAgent
will propose a topic for a short blog post.DraftWriterAgent
.DraftWriterAgent
will receive the topic, use an LLM to generate a short draft (a few paragraphs) on that topic, and then send this draft back to the ContentIdeaGeneratorAgent
.This exercise will solidify your understanding of message-based communication, agent roles, and basic LLM integration within a multi-agent context.
Before writing any code, it's important to define how our agents will talk to each other. A clear protocol ensures that messages are understood correctly. We'll use JSON for our message format due to its simplicity and widespread use.
A message will have the following structure:
{
"sender_id": "string", // ID of the agent sending the message
"receiver_id": "string", // ID of the intended recipient agent
"message_id": "string", // Unique ID for the message
"session_id": "string", // ID for the conversation or task session
"message_type": "string", // Type of message (e.g., "IDEA_PROPOSAL", "DRAFT_RESPONSE")
"payload": { // Content of the message
// ... specific content based on message_type
}
}
For our scenario, we'll need two main message types:
ContentIdeaGeneratorAgent
to DraftWriterAgent
.
payload
: {"topic": "suggested topic string"}
DraftWriterAgent
to ContentIdeaGeneratorAgent
.
payload
: {"topic": "original topic string", "draft": "generated draft string"}
Here's a diagram illustrating the interaction flow:
The
ContentIdeaGeneratorAgent
initiates communication by sending anIDEA_PROPOSAL
message to theDraftWriterAgent
. TheDraftWriterAgent
processes this request and replies with aDRAFT_RESPONSE
message containing the generated content.
Ensure you have Python 3.8+ installed. For this exercise, we'll use the OpenAI API for LLM capabilities. Make sure you have the openai
library installed (pip install openai
) and your OpenAI API key configured as an environment variable (OPENAI_API_KEY
).
We'll define simple classes for our agents. In a more complex system, you might use a multi-agent framework, but for learning the communication fundamentals, a direct implementation is very instructive.
Let's start by creating a helper function for interacting with the LLM. This abstracts the API call.
import os
import uuid
import json
from openai import OpenAI
# Initialize the OpenAI client
# Ensure your OPENAI_API_KEY environment variable is set
client = OpenAI()
def generate_llm_response(prompt_text, model="gpt-3.5-turbo", max_tokens=200):
"""
Generates a response from the LLM based on the prompt.
"""
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt_text}],
max_tokens=max_tokens,
temperature=0.7
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"Error calling LLM: {e}")
return "Error: Could not generate LLM response."
# We'll use a simple in-memory message queue for this example.
# In a real system, this would be a robust message broker like RabbitMQ or Kafka.
message_queue = []
def send_message(sender_id, receiver_id, message_type, payload, session_id):
"""
Simulates sending a message by adding it to our queue.
"""
message = {
"sender_id": sender_id,
"receiver_id": receiver_id,
"message_id": str(uuid.uuid4()),
"session_id": session_id,
"message_type": message_type,
"payload": payload
}
print(f"\n[MESSAGE SENT] From: {sender_id} To: {receiver_id} Type: {message_type}")
print(f"Payload: {json.dumps(payload, indent=2)}")
message_queue.append(message)
def receive_message(agent_id):
"""
Simulates an agent checking its mailbox (our queue).
Returns the first message intended for this agent, or None.
"""
for i, msg in enumerate(message_queue):
if msg["receiver_id"] == agent_id:
message = message_queue.pop(i)
print(f"\n[MESSAGE RECEIVED] By: {agent_id} From: {message['sender_id']} Type: {message['message_type']}")
print(f"Payload: {json.dumps(message['payload'], indent=2)}")
return message
return None
This agent will initiate the process.
class ContentIdeaGeneratorAgent:
def __init__(self, agent_id="IdeaGenerator_001"):
self.agent_id = agent_id
self.current_session_id = None
def propose_idea(self, idea_prompt="Suggest a compelling topic for a short tech blog post aimed at developers."):
"""
Generates a topic idea using an LLM and sends it to the DraftWriterAgent.
"""
print(f"\n[{self.agent_id}] Generating content idea...")
topic = generate_llm_response(idea_prompt, max_tokens=50)
if "Error:" in topic:
print(f"[{self.agent_id}] Failed to generate topic: {topic}")
return
print(f"[{self.agent_id}] Generated Topic: {topic}")
self.current_session_id = str(uuid.uuid4()) # Start a new session
payload = {"topic": topic}
send_message(
sender_id=self.agent_id,
receiver_id="DraftWriter_001", # Target agent ID
message_type="IDEA_PROPOSAL",
payload=payload,
session_id=self.current_session_id
)
def handle_response(self, message):
"""
Processes the draft response from the DraftWriterAgent.
"""
if message and message["message_type"] == "DRAFT_RESPONSE":
if message["session_id"] == self.current_session_id:
print(f"\n[{self.agent_id}] Received draft for topic: '{message['payload']['topic']}'")
print(f"[{self.agent_id}] Draft Content:\n{message['payload']['draft']}")
self.current_session_id = None # End session
else:
print(f"[{self.agent_id}] Received response for an old or unknown session: {message['session_id']}")
else:
print(f"[{self.agent_id}] Received an unexpected message or no message.")
This agent waits for a topic and then writes a draft.
class DraftWriterAgent:
def __init__(self, agent_id="DraftWriter_001"):
self.agent_id = agent_id
def process_idea_proposal(self, message):
"""
Receives a topic, generates a draft, and sends it back.
"""
if message and message["message_type"] == "IDEA_PROPOSAL":
topic = message["payload"]["topic"]
session_id = message["session_id"]
print(f"\n[{self.agent_id}] Received topic: '{topic}'. Generating draft...")
draft_prompt = f"Write a short, engaging blog post draft (2-3 paragraphs) on the topic: '{topic}'. The tone should be informative and slightly informal."
draft = generate_llm_response(draft_prompt, max_tokens=300)
if "Error:" in draft:
print(f"[{self.agent_id}] Failed to generate draft for topic '{topic}': {draft}")
# Optionally send an error message back
payload = {"topic": topic, "error": "Failed to generate draft"}
send_message(
sender_id=self.agent_id,
receiver_id=message["sender_id"],
message_type="DRAFT_ERROR", # A new message type for errors
payload=payload,
session_id=session_id
)
return
response_payload = {"topic": topic, "draft": draft}
send_message(
sender_id=self.agent_id,
receiver_id=message["sender_id"],
message_type="DRAFT_RESPONSE",
payload=response_payload,
session_id=session_id
)
else:
print(f"[{self.agent_id}] Received an unexpected message type or no message.")
def listen_for_work(self):
"""
Agent's main loop to check for messages.
In a real application, this would be event-driven or use a proper message queue listener.
"""
print(f"\n[{self.agent_id}] Listening for incoming tasks...")
message = receive_message(self.agent_id)
if message:
self.process_idea_proposal(message)
return True # Indicated work was done
return False # No message processed
Now, let's instantiate these agents and run the simulation.
def run_simulation():
# Instantiate agents
idea_agent = ContentIdeaGeneratorAgent()
writer_agent = DraftWriterAgent()
print("--- Starting Two-Agent Communication Simulation ---")
# 1. IdeaGeneratorAgent proposes an idea
idea_agent.propose_idea("The future of serverless computing with WebAssembly.")
# 2. DraftWriterAgent listens for work and processes the idea
# In a real system, agents would run in separate threads/processes and listen continuously.
# Here, we simulate by explicitly calling listen_for_work.
work_done = writer_agent.listen_for_work()
if not work_done:
print("DraftWriterAgent found no work immediately.")
# In a real scenario, it would keep listening. For this demo, this means message wasn't picked.
# 3. IdeaGeneratorAgent listens for the response
# We need to ensure the idea_agent also checks its "mailbox"
response_message = receive_message(idea_agent.agent_id)
if response_message:
idea_agent.handle_response(response_message)
else:
print(f"IdeaGeneratorAgent found no response immediately.")
print("\n--- Simulation Ended ---")
if __name__ == "__main__":
# Ensure OPENAI_API_KEY is set in your environment
if not os.getenv("OPENAI_API_KEY"):
print("Error: OPENAI_API_KEY environment variable not set.")
print("Please set it before running the script.")
else:
run_simulation()
To run this:
.py
file (e.g., two_agent_comm.py
).OPENAI_API_KEY
environment variable.python two_agent_comm.py
You should see output similar to this (exact LLM responses will vary):
--- Starting Two-Agent Communication Simulation ---
[IdeaGenerator_001] Generating content idea...
[IdeaGenerator_001] Generated Topic: Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing
[MESSAGE SENT] From: IdeaGenerator_001 To: DraftWriter_001 Type: IDEA_PROPOSAL
Payload: {
"topic": "Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing"
}
[DraftWriter_001] Listening for incoming tasks...
[MESSAGE RECEIVED] By: DraftWriter_001 From: IdeaGenerator_001 Type: IDEA_PROPOSAL
Payload: {
"topic": "Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing"
}
[DraftWriter_001] Received topic: 'Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing'. Generating draft...
[MESSAGE SENT] From: DraftWriter_001 To: IdeaGenerator_001 Type: DRAFT_RESPONSE
Payload: {
"topic": "Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing",
"draft": "The worlds of serverless computing and WebAssembly (Wasm) are on a collision course, and the impact, particularly at the edge, is poised to be significant. Serverless functions have long promised reduced operational overhead and scalable compute, but often come with cold start trade-offs and language limitations. WebAssembly, a binary instruction format for a stack-based virtual machine, offers near-native performance, a compact size, and language flexibility, making it an ideal companion for serverless architectures.\n\nImagine deploying highly efficient, sandboxed Wasm modules as your serverless functions. These modules can be written in languages like Rust, C++, or Go, compiled to Wasm, and then executed swiftly at edge locations closer to your users. This synergy not only addresses cold start issues due to Wasm's quick instantiation times but also enhances security through its well-defined sandboxing model. As edge computing demands more responsive and resource-conscious applications, the combination of serverless paradigms with WebAssembly's performance and portability presents a compelling future for developers building next-generation distributed systems."
}
[MESSAGE RECEIVED] By: IdeaGenerator_001 From: DraftWriter_001 Type: DRAFT_RESPONSE
Payload: {
"topic": "Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing",
"draft": "The worlds of serverless computing and WebAssembly (Wasm) are on a collision course, and the impact, particularly at the edge, is poised to be significant. Serverless functions have long promised reduced operational overhead and scalable compute, but often come with cold start trade-offs and language limitations. WebAssembly, a binary instruction format for a stack-based virtual machine, offers near-native performance, a compact size, and language flexibility, making it an ideal companion for serverless architectures.\n\nImagine deploying highly efficient, sandboxed Wasm modules as your serverless functions. These modules can be written in languages like Rust, C++, or Go, compiled to Wasm, and then executed swiftly at edge locations closer to your users. This synergy not only addresses cold start issues due to Wasm's quick instantiation times but also enhances security through its well-defined sandboxing model. As edge computing demands more responsive and resource-conscious applications, the combination of serverless paradigms with WebAssembly's performance and portability presents a compelling future for developers building next-generation distributed systems."
}
[IdeaGenerator_001] Received draft for topic: 'Exploring the Synergy: Serverless Functions and WebAssembly for Edge Computing'
[IdeaGenerator_001] Draft Content:
The worlds of serverless computing and WebAssembly (Wasm) are on a collision course, and the impact, particularly at the edge, is poised to be significant. Serverless functions have long promised reduced operational overhead and scalable compute, but often come with cold start trade-offs and language limitations. WebAssembly, a binary instruction format for a stack-based virtual machine, offers near-native performance, a compact size, and language flexibility, making it an ideal companion for serverless architectures.
Imagine deploying highly efficient, sandboxed Wasm modules as your serverless functions. These modules can be written in languages like Rust, C++, or Go, compiled to Wasm, and then executed swiftly at edge locations closer to your users. This synergy not only addresses cold start issues due to Wasm's quick instantiation times but also enhances security through its well-defined sandboxing model. As edge computing demands more responsive and resource-conscious applications, the combination of serverless paradigms with WebAssembly's performance and portability presents a compelling future for developers building next-generation distributed systems.
--- Simulation Ended ---
This hands-on exercise demonstrates a basic yet complete communication loop between two LLM-powered agents. Key takeaways include:
sender_id
, receiver_id
, message_type
, payload
, session_id
) is fundamental for reliable inter-agent communication.session_id
helps correlate requests and responses, which is important in systems handling multiple concurrent interactions.message_queue
) as our message bus. In production systems, you'd employ robust message queue technologies (e.g., RabbitMQ, Kafka, Redis Streams) or dedicated P2P communication libraries to handle message delivery, persistence, and scaling.To extend this example, consider:
asyncio
library, truly simulating concurrent operation.DraftWriterAgent
fails to generate a draft after several retries? It could send a specific DRAFT_FAILED
message.ReviewerAgent
that receives the draft from the DraftWriterAgent
, provides feedback (perhaps using another LLM call), and then sends it to the ContentIdeaGeneratorAgent
or back to the DraftWriterAgent
for revisions. This would introduce a more complex workflow.By building and experimenting with this simple system, you've taken a significant step towards understanding the practicalities of inter-agent communication in multi-agent LLM systems. The principles applied here form the bedrock for more complex and capable agent collaborations.
Was this section helpful?
© 2025 ApX Machine Learning