本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。
本文为该课程的第四章(多智能体开发)的第三篇笔记。主要是对课程刚开始环境搭建成功后跑通的第一个MetaGPT多智能体案例 - 多智能体辩论,进行学习,并拆解其中的实现步骤和原理。
系列笔记
- 【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑
- 【AI Agent系列】【MetaGPT多智能体学习】1. 再理解 AI Agent - 经典案例和热门框架综述
- 【AI Agent系列】【MetaGPT多智能体学习】2. 重温单智能体开发 - 深入源码,理解单智能体运行框架
- 【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制
- 【AI Agent系列】【MetaGPT多智能体学习】4. 基于MetaGPT的Team组件开发你的第一个智能体团队
多智能体辩论需求。假设是两个智能体进行辩论。
0. 辩论Action的定义
辩论进行的动作都一样,所以Action可以共用一个。除了 Prompt 需要费点功夫想,其它的流程都和之前一样,平平无奇。
class SpeakAloud(Action): """Action: Speak out aloud in a debate (quarrel)""" PROMPT_TEMPLATE: str = """ ## BACKGROUND Suppose you are {name}, you are in a debate with {opponent_name}. ## DEBATE HISTORY Previous rounds: {context} ## YOUR TURN Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments, craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue: """ name: str = "SpeakAloud" async def run(self, context: str, name: str, opponent_name: str): prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name) # logger.info(prompt) rsp = await self._aask(prompt) return rsp
1. 辩论智能体的定义
智能体的职能也一样,所以可以共用一个Role的构造。
1.1 让两个智能体交替运行的重点
它的Action就是上面的辩论Action,通过 self.set_actions([SpeakAloud])
设置。
它的动作时机通过 self._watch([UserRequirement, SpeakAloud])
来设置,可以看到它观察环境中出现 UserRequirement
或 SpeakAloud
来源的消息时,会有所动作。
那么问题来了,现在辩论的智能体Role和Action都只有一个,Role1说完之后放到环境中的信息来源是 SpeakAloud,Role2说完后放到环境中的也是 SpeakAloud,如果还是之前的代码,那当环境中出现 SpeakAloud时,Role1 和 Role2 都会触发动作了。这是不对的。
我们需要的是Role1等待Role2说完再说,Role2等待Role1说完后再说,交替着表达。
怎么实现这一点呢?回想我们之前学的单智能体的运行周期,_observe —> _react(_think + _act) —> publish_message,能阻止智能体不动作的是 _observe 函数。
1.2 重写 _observe 函数
先来看下原_observe函数的实现:
async def _observe(self, ignore_memory=False) -> int: """Prepare new messages for processing from the message buffer and other sources.""" # Read unprocessed messages from the msg buffer. news = [] if self.recovered: news = [self.latest_observed_msg] if self.latest_observed_msg else [] if not news: news = self.rc.msg_buffer.pop_all() # Store the read messages in your own memory to prevent duplicate processing. old_messages = [] if ignore_memory else self.rc.memory.get() self.rc.memory.add_batch(news) # Filter out messages of interest. self.rc.news = [ n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages ] self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg # Design Rules: # If you need to further categorize Message objects, you can do so using the Message.set_meta function. # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer. news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news] if news_text: logger.debug(f"{self._setting} observed: {news_text}") return len(self.rc.news)
它的结果 len(self.rc.news)
只要大于0,这个智能体就会执行 _react
,开始行动。self.rc.news
怎么来的?重点看源码中的这句:
# Filter out messages of interest. self.rc.news = [ n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages ]
self.rc.news
是从 news
中过滤出了该智能体所关心的消息 n.cause_by in self.rc.watch
和 指定发送给该智能体的消息 self.name in n.send_to
。而 news
其实就是 msg_buffer
,msg_buffer
来自环境的 publish_message
。
再来看下环境 publish_message
的代码:
def publish_message(self, message: Message, peekable: bool = True) -> bool: """ Distribute the message to the recipients. In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned in RFC 113 for the entire system, the routing information in the Message is only responsible for specifying the message recipient, without concern for where the message recipient is located. How to route the message to the message recipient is a problem addressed by the transport framework designed in RFC 113. """ logger.debug(f"publish_message: {message.dump()}") found = False # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113 for role, addrs in self.member_addrs.items(): if is_send_to(message, addrs): role.put_message(message) found = True if not found: logger.warning(f"Message no recipients: {message.dump()}") self.history += f"\n{message}" # For debug return True
看里面的 is_send_to
函数:
def is_send_to(message: "Message", addresses: set): """Return whether it's consumer""" if MESSAGE_ROUTE_TO_ALL in message.send_to: return True for i in addresses: if i in message.send_to: return True return False
如果指定了 message.send_to
,则环境只会将该消息发送给指定的Role
。
光看上面的代码和我的文字可能不是很好理解,容易绕晕,我给你们画了个图:
如上图的流程,这对于我们的辩论智能体来说其实就够了。Role1执行完动作后,将结果消息指定发送给Role2(通过message的send_to参数),这样Role1的msg_buffer中就不会出现它自身的这个消息,下次run时msg_buffer就直接是空的,就不运行了,等待Role2的消息。同样的,Role2执行完后将消息指定发送给Role1。这样就实现了交互行动,辩论的过程。
教程中用的0.6.6版本,不知道源码是不是这样。但是它为了实现这个辩论过程,还重写了 _observe 函数:
async def _observe(self) -> int: await super()._observe() # accept messages sent (from opponent) to self, disregard own messages from the last round self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # 第二次筛选 return len(self.rc.news)
可以参考下。但是我用的0.7.2版本,看源码和实践证明,不用重写_observe了。
1.3 重写 _act 函数
这里为什么要重写 _act
函数,最主要的原因是要填执行完结果消息中的 send_to
参数(源码中是没有填的)。还有就是组装辩论的上下文(对手说了什么 context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
)。
async def _act(self) -> Message: logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})") todo = self.rc.todo # An instance of SpeakAloud memories = self.get_memories() context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories) print(context) rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name) msg = Message( content=rsp, role=self.profile, cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, ) self.rc.memory.add(msg) return msg
cause_by=type(todo)
, sent_from=self.name
, send_to=self.opponent_name
,
这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。
给大家再看一下组装的上下文的示例,有个更直观的认识(Trump发言时,收到的上文是Biden及其之前发言的内容):
1.4 最终Role的代码
class Debator(Role): name: str = "" profile: str = "" opponent_name: str = "" def __init__(self, **data: Any): super().__init__(**data) self.set_actions([SpeakAloud]) self._watch([UserRequirement, SpeakAloud]) ## 0.7.2 版本可以不用重写 _observe,具体原因分析见上文 # async def _observe(self) -> int: # await super()._observe() # # accept messages sent (from opponent) to self, disregard own messages from the last round # self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # return len(self.rc.news) async def _act(self) -> Message: logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})") todo = self.rc.todo # An instance of SpeakAloud memories = self.get_memories() context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories) print(context) rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name) msg = Message( content=rsp, role=self.profile, cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, ) self.rc.memory.add(msg) return msg
2. 实例化智能体
实例化两个智能体,Team组件的用法前面已经学习过,可参考我的上篇文章。
其中值得注意的是,team.run_project(idea, send_to="Biden")
通过 send_to
参数指定谁先发言。
async def debate(idea: str, investment: float = 3.0, n_round: int = 5): """Run a team of presidents and watch they quarrel. :)""" Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump") Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden") team = Team() team.hire([Biden, Trump]) team.invest(investment) team.run_project(idea, send_to="Biden") # send debate topic to Biden and let him speak first await team.run(n_round=n_round)
3. 完整代码及运行结果
完整代码:
import asyncio import platform from typing import Any import fire from metagpt.actions import Action, UserRequirement from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message from metagpt.team import Team class SpeakAloud(Action): """Action: Speak out aloud in a debate (quarrel)""" PROMPT_TEMPLATE: str = """ ## BACKGROUND Suppose you are {name}, you are in a debate with {opponent_name}. ## DEBATE HISTORY Previous rounds: {context} ## YOUR TURN Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments, craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue: """ name: str = "SpeakAloud" async def run(self, context: str, name: str, opponent_name: str): prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name) # logger.info(prompt) rsp = await self._aask(prompt) return rsp class Debator(Role): name: str = "" profile: str = "" opponent_name: str = "" def __init__(self, **data: Any): super().__init__(**data) self.set_actions([SpeakAloud]) self._watch([UserRequirement, SpeakAloud]) # async def _observe(self) -> int: # await super()._observe() # # accept messages sent (from opponent) to self, disregard own messages from the last round # self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # return len(self.rc.news) async def _act(self) -> Message: logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})") todo = self.rc.todo # An instance of SpeakAloud memories = self.get_memories() context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories) print(context) rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name) msg = Message( content=rsp, role=self.profile, cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, ) self.rc.memory.add(msg) return msg async def debate(idea: str, investment: float = 3.0, n_round: int = 5): """Run a team of presidents and watch they quarrel. :)""" Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump") Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden") team = Team() team.hire([Biden, Trump]) team.invest(investment) team.run_project(idea, send_to="Biden") # send debate topic to Biden and let him speak first await team.run(n_round=n_round) def main(idea: str, investment: float = 3.0, n_round: int = 10): """ :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting" or "Trump: Climate change is a hoax" :param investment: contribute a certain dollar amount to watch the debate :param n_round: maximum rounds of the debate :return: """ if platform.system() == "Windows": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(debate(idea, investment, n_round)) if __name__ == "__main__": fire.Fire(main("Topic: The U.S. should commit more in climate change fighting"))
运行结果(交替发言):
4. 总结
这节内容最主要的是让我们了解和实践了多智能体间更多的消息交互和订阅机制,主要是 send_to
参数的使用。
cause_by=type(todo)
,sent_from=self.name
,send_to=self.opponent_name
, 这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。
站内文章一览