前言

Qwen-Agent是一个开发框架。开发者可基于本框架开发Agent应用,充分利用基于通义千问模型(Qwen)的指令遵循、工具使用、规划、记忆能力。本框架也提供了浏览器助手、代码解释器、自定义助手等示例应用,该篇为系列3。

【通义千问——Qwen-Agent系列文章】:
【通义千问—Qwen-Agent系列1】Qwen-Agent 快速开始&使用和开发过程.
【通义千问—Qwen-Agent系列2】Qwen-Agent 的案例分析(图像理解&图文生成Agent||多模态助手|| 基于ReAct范式的数据分析Agent)
【通义千问—Qwen-Agent系列3】Qwen-Agent 的案例分析(五子棋游戏&多Agent冒险游戏&多智能体群组交流)

一、快速开始

1-1、介绍

Qwen-Agent: 是一个开发框架。开发者可基于本框架开发Agent应用,充分利用基于通义千问模型(Qwen)的指令遵循、工具使用、规划、记忆能力。本项目也提供了浏览器助手、代码解释器、自定义助手等示例应用。

在这里插入图片描述

1-2、安装

1、使用pip安装:

pip install -U qwen-agent

2、从Github安装最新版本

git clone https://github.com/QwenLM/Qwen-Agent.git
cd Qwen-Agent
pip install -e ./

1-3、开发你自己的Agent

概述:下面的示例说明了创建一个能够读取PDF文件和利用工具的代理的过程,以及构建自定义工具,以下为详细介绍:

  • 添加一个自定义工具:图片生成工具
  • 使用到的LLM模型配置。
  • 创建Agent,这里我们以“Assistant”代理为例,它能够使用工具和读取文件。
  • 以聊天机器人的形式运行助理。
import pprint
import urllib.parse
import json5
from qwen_agent.agents import Assistant
from qwen_agent.tools.base import BaseTool, register_tool

# Step 1 (Optional): Add a custom tool named `my_image_gen`.
@register_tool('my_image_gen')
class MyImageGen(BaseTool):
    # The `description` tells the agent the functionality of this tool.
    description = 'AI painting (image generation) service, input text description, and return the image URL drawn based on text information.'
    # The `parameters` tell the agent what input parameters the tool has.
    parameters = [{
        'name': 'prompt',
        'type': 'string',
        'description': 'Detailed description of the desired image content, in English',
        'required': True
    }]

    def call(self, params: str, **kwargs) -> str:
        # `params` are the arguments generated by the LLM agent.
        prompt = json5.loads(params)['prompt']
        # 对提示词进行URL编码
        prompt = urllib.parse.quote(prompt)
        # 
        return json5.dumps(
            {'image_url': f'https://image.pollinations.ai/prompt/{prompt}'},
            ensure_ascii=False)


# Step 2: Configure the LLM you are using.
# 这里是需要配置模型的地方。需要填写模型名字,以及model_server,即模型所在服务器名字,如果没有,也可以考虑使用api_key。
llm_cfg = {
    # Use the model service provided by DashScope:
	# model:模型名称
	# model_server:模型所在的服务器
	# api_key: 所使用到的api-key,可以显示的设置,也可以从环境变量中获取
	
    'model': 'qwen-max',
    'model_server': 'dashscope',
    # 'api_key': 'YOUR_DASHSCOPE_API_KEY',
    # It will use the `DASHSCOPE_API_KEY' environment variable if 'api_key' is not set here.

    # Use a model service compatible with the OpenAI API, such as vLLM or Ollama:
    # 'model': 'Qwen1.5-7B-Chat',
    # 'model_server': 'http://localhost:8000/v1',  # base_url, also known as api_base
    # 'api_key': 'EMPTY',

    # (Optional) LLM hyperparameters for generation:
    # 用于调整生成参数的可选配置
    'generate_cfg': {
        'top_p': 0.8
    }
}

# Step 3: Create an agent. Here we use the `Assistant` agent as an example, which is capable of using tools and reading files.

# agent的提示词指令
system_instruction = '''You are a helpful assistant.
After receiving the user's request, you should:
- first draw an image and obtain the image url,
- then run code `request.get(image_url)` to download the image,
- and finally select an image operation from the given document to process the image.
Please show the image using `plt.show()`.'''

# 工具列表,指定Assistant可以访问的工具,一个是自定义的工具,一个是代码执行器
tools = ['my_image_gen', 'code_interpreter']  # `code_interpreter` is a built-in tool for executing code.
# 助理可以读取的文件路径
files = ['./examples/resource/doc.pdf']  # Give the bot a PDF file to read.

# 初始化Assistant
bot = Assistant(llm=llm_cfg,
                system_message=system_instruction,
                function_list=tools,
                files=files)

# Step 4: Run the agent as a chatbot.
messages = []  # This stores the chat history.
while True:
    # For example, enter the query "draw a dog and rotate it 90 degrees".
    query = input('user query: ')
    # Append the user query to the chat history.
    messages.append({'role': 'user', 'content': query})
    response = []
    for response in bot.run(messages=messages):
        # Streaming output.
        print('bot response:')
        pprint.pprint(response, indent=2)
    # Append the bot responses to the chat history.
    messages.extend(response)
  • 首先输入任务目标:draw a dog and rotate it 90 degrees
    在这里插入图片描述

  • 绘制的狗子图片:
    在这里插入图片描述

  • 结果输出:
    在这里插入图片描述

  • Agent处理后的狗子图片展示:

在这里插入图片描述

二、基于Qwen-Agent的案例分析

2-0、环境安装

# 更新qwen_agent  以及 modelscope-studio
pip install --upgrade qwen_agent  
pip install --upgrade modelscope-studio

2-1、五子棋游戏实现

概述:这段代码实现了一个基于多智能体的五子棋游戏,其中一个玩家是人类,另一个玩家是非玩家角色(NPC),并使用Web用户界面(WebUI)进行交互。以下是对这段代码的详细解释:

代码结构
1、导入模块:

GroupChat:用于多智能体聊天的主要类。
WebUI:用于创建Web用户界面的类。
Message:用于在聊天中传递消息的类。

2、配置文件 (CFGS):

定义了一个五子棋游戏的配置,包括背景描述、棋盘和两个玩家(NPC和人类)的角色设置。

配置文件详解

1、背景描述 (background):

  • 描述了五子棋的基本规则和棋盘大小。NPC下白棋,人类玩家下黑棋。

2、智能体 (agents):

  • 棋盘:负责更新和展示棋盘。棋盘用矩阵表示,0代表无棋子,1代表黑棋,-1代表白棋。
  • 小明:NPC玩家,下白棋。根据棋盘状态和规则决定下一步落子的位置。
  • 小塘:人类玩家,下黑棋。

以下为详细代码:

"""A chess play game implemented by group chat"""

from qwen_agent.agents import GroupChat
from qwen_agent.gui import WebUI
from qwen_agent.llm.schema import Message

# Define a configuration file for a multi-agent:
# one real player, one NPC player, and one chessboard
NPC_NAME = '小明'
USER_NAME = '小塘'
CFGS = {
    'background':
        f'一个五子棋群组,棋盘为5*5,黑棋玩家和白棋玩家交替下棋,每次玩家下棋后,棋盘进行更新并展示。{NPC_NAME}下白棋,{USER_NAME}下黑棋。',
    'agents': [
        {
            'name':
                '棋盘',
            'description':
                '负责更新棋盘',
            'instructions':
                '你扮演一个五子棋棋盘,你可以根据原始棋盘和玩家下棋的位置坐标,把新的棋盘用矩阵展示出来。棋盘中用0代表无棋子、用1表示黑棋、用-1表示白棋。用坐标<i,j>表示位置,i代表行,j代表列,棋盘左上角位置为<0,0>。',
            'selected_tools': ['code_interpreter'],
        },
        {
            'name':
                NPC_NAME,
            'description':
                '白棋玩家',
            'instructions':
                '你扮演一个玩五子棋的高手,你下白棋。棋盘中用0代表无棋子、用1黑棋、用-1白棋。用坐标<i,j>表示位置,i代表行,j代表列,棋盘左上角位置为<0,0>,请决定你要下在哪里,你可以随意下到一个位置,不要说你是AI助手不会下!返回格式为坐标:\n<i,j>\n除了这个坐标,不要返回其他任何内容',
        },
        {
            'name': USER_NAME,
            'description': '黑棋玩家',
            'is_human': True
        },
    ],
}


def test(query: str):
    bot = GroupChat(agents=CFGS, llm={'model': 'qwen-max'})

    messages = [Message('user', query, name=USER_NAME)]
    for response in bot.run(messages=messages):
        print('bot response:', response)


def app_tui():
    # Define a group chat agent from the CFGS
    bot = GroupChat(agents=CFGS, llm={'model': 'qwen-max'})
    # Chat
    messages = []
    while True:
        query = input('user question: ')
        messages.append(Message('user', query, name=USER_NAME))
        response = []
        for response in bot.run(messages=messages):
            print('bot response:', response)
        messages.extend(response)


def app_gui():
    # Define a group chat agent from the CFGS
    # GroupChat 类是一个多智能体管理的代理类,用于在一个群聊环境中管理多个智能体(agents)的对话顺序,并输出每个智能体的响应。
    bot = GroupChat(agents=CFGS, llm={'model': 'qwen-max'})
    chatbot_config = {
        'user.name': '小塘',
        'prompt.suggestions': [
            '开始!我先手,落子 <1,1>',
            '我后手,请小明先开始',
            '新开一盘,我先开始',
        ],
        'verbose': True
    }

    WebUI(
        bot,
        chatbot_config=chatbot_config,
    ).run()


if __name__ == '__main__':
    # test()
    # app_tui()
    app_gui()

输出: 由于五子棋太无聊了不想输出。

2-2、多Agent冒险游戏

概述: 经过对五子棋游戏的一顿魔改实现多Agent冒险游戏。

游戏简介: 实现了一个多智能体的冒险类游戏,其中一个玩家是人类,其余玩家为NPC。在游戏中,玩家和队友将一起探索冒险世界,解决谜题并完成任务。每个NPC有不同的技能,Guide智能体会在每次发言后描述当前情况并提供下一步的选择。

from qwen_agent.agents import GroupChat
from qwen_agent.gui import WebUI
from qwen_agent.llm.schema import Message

# Define a configuration file for a multi-agent adventure game:
# one real player, multiple NPC players, and an adventure world
NPC_GUIDE_NAME = 'Guide'
USER_NAME = '救世主大人'
TEAMMATE_NAMES_AND_SKILLS = {
    'Warrior': '战斗',
    'Mage': '魔法',
    'Archer': '弓箭',
    'Healer': '治疗',
    'Rogue': '潜行',
    'Bard': '音乐',
    'Druid': '自然魔法',
    'Paladin': '神圣力量'
}

CFGS = {
    'background': f'你在一个神秘的冒险世界中,这里充满了谜题和任务。{NPC_GUIDE_NAME}是你的向导,{USER_NAME}和他的队友们将一起探索这个世界。',
    'agents': [
        {
            'name': NPC_GUIDE_NAME,
            'description': '一个博学多识的向导,帮助玩家了解世界和完成任务',
            'instructions': '你扮演一个冒险世界的向导,帮助冒险者了解世界、解决谜题和完成任务。在每次其他人发言完毕之后,描述当下情况,描述其他agent的状态,对下一步应该做什么给出一些选择。',
            'selected_tools': [],
        },
        {
            'name': USER_NAME,
            'description': '一个勇敢的冒险者,探索神秘世界,解决谜题和完成任务',
            'is_human': True
        },
    ] + [
        {
            'name': name,
            'description': f'一名冒险者的队友,擅长{skill}技能,协助玩家探索和战斗。',
            'instructions': f'你扮演一名冒险者的队友,擅长{skill}技能,协助玩家探索和战斗。',
            'selected_tools': [],
        } for name, skill in TEAMMATE_NAMES_AND_SKILLS.items()
    ],
}

def test(query: str):
    bot = GroupChat(agents=CFGS, llm={'model': 'qwen-max'})

    messages = [Message('user', query, name=USER_NAME)]
    for response in bot.run(messages=messages):
        print('bot response:', response)


def app_tui():
    # Define a group chat agent from the CFGS
    bot = GroupChat(agents=CFGS, llm={'model': 'qwen-max'})
    # Chat
    messages = []
    while True:
        query = input('user question: ')
        messages.append(Message('user', query, name=USER_NAME))
        response = []
        for response in bot.run(messages=messages):
            print('bot response:', response)
        messages.extend(response)


def app_gui():
    # Define a group chat agent from the CFGS
    bot = GroupChat(agents=CFGS, llm={'model': 'qwen-max'})
    chatbot_config = {
        'user.name': USER_NAME,
        'prompt.suggestions': [
            '进入森林',
            '查看背包',
            '与向导对话',
            '与战士对话',
            '与法师对话',
            '与弓箭手对话',
            '与治疗师对话',
            '与潜行者对话',
            '与吟游诗人对话',
            '与德鲁伊对话',
            '与圣骑士对话',
        ],
        'verbose': True
    }

    WebUI(
        bot,
        chatbot_config=chatbot_config,
    ).run()


if __name__ == '__main__':
    # test()
    # app_tui()
    app_gui()

输出: 真不错,是我想要的效果!

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

2-N、 多智能体群组交流(太酷啦)

"""A group chat gradio demo"""
import json

import gradio as gr
import json5

from qwen_agent.agents import GroupChat, GroupChatCreator
from qwen_agent.agents.user_agent import PENDING_USER_INPUT
from qwen_agent.llm.schema import ContentItem, Message


def init_agent_service(cfgs):
    llm_cfg = {'model': 'qwen-max'}
    bot = GroupChat(agents=cfgs, llm=llm_cfg)
    return bot


def init_agent_service_create():
    llm_cfg = {'model': 'qwen-max'}
    bot = GroupChatCreator(llm=llm_cfg)
    return bot


# =========================================================
# Below is the gradio service: front-end and back-end logic
# =========================================================

app_global_para = {
    'messages': [],
    'messages_create': [],
    'is_first_upload': False,
    'uploaded_file': '',
    'user_interrupt': True
}

# Initialized group chat configuration
CFGS = {
    'background':
        '一个陌生人互帮互助群聊',
    'agents': [
        {
            'name': '小塘',
            'description': '一个勤劳的打工人,每天沉迷工作,日渐消瘦。(这是一个真实用户)',
            'is_human': True  # mark this as a real person
        },
        {
            'name': '甄嬛',
            'description': '一位后宫妃嫔',
            'instructions': '你是甄嬛,你正在想办法除掉皇后,你说话风格为文言文,每次说完话会调image_gen工具画一幅图,展示心情。',
            'knowledge_files': [],
            'selected_tools': ['image_gen']
        },
        {
            'name': 'ikun',
            'description': '熟悉蔡徐坤的动态',
            'instructions': '你是蔡徐坤的粉丝,说话很简短,喜欢用颜文字表达心情,你最近迷恋看《甄嬛传》',
            'knowledge_files': [],
            'selected_tools': []
        },
        {
            'name': '大头',
            'description': '是一个体育生,不喜欢追星',
            'instructions': '你是一个体育生,热爱运动,你不喜欢追星,你喜欢安利别人健身',
            'knowledge_files': [],
            'selected_tools': []
        }
    ]
}

MAX_ROUND = 3


def app(cfgs):
    # Todo: Reinstance every time or instance one time as global variable?
    cfgs = json5.loads(cfgs)
    bot = init_agent_service(cfgs=cfgs)

    # Record all mentioned agents: reply in order
    mentioned_agents_name = []

    for i in range(MAX_ROUND):
        messages = app_global_para['messages']
        print(i, messages)

        # Interrupt: there is new input from user
        if i == 0:
            app_global_para['user_interrupt'] = False
        if i > 0 and app_global_para['user_interrupt']:
            app_global_para['user_interrupt'] = False
            print('GroupChat is interrupted by user input!')
            # Due to the concurrency issue with Gradio, unable to call the second service simultaneously
            for rsp in app(json.dumps(cfgs, ensure_ascii=False)):
                yield rsp
            break
        # Record mentions into mentioned_agents_name list
        content = ''
        if messages:
            if isinstance(messages[-1].content, list):
                content = '\n'.join([x.text if x.text else '' for x in messages[-1].content]).strip()
            else:
                content = messages[-1].content.strip()
        if '@' in content:
            for x in content.split('@'):
                for agent in cfgs['agents']:
                    if x.startswith(agent['name']):
                        if agent['name'] not in mentioned_agents_name:
                            mentioned_agents_name.append(agent['name'])
                        break
        # Get one response from groupchat
        response = []
        try:
            display_history = _get_display_history_from_message()
            yield display_history
            for response in bot.run(messages, need_batch_response=False, mentioned_agents_name=mentioned_agents_name):
                if response:
                    if response[-1].content == PENDING_USER_INPUT:
                        # Stop printing the special message for mention human
                        break
                    incremental_history = []
                    for x in response:
                        function_display = ''
                        if x.function_call:
                            function_display = f'\nCall Function: {str(x.function_call)}'
                        incremental_history += [[None, f'{x.name}: {x.content}{function_display}']]
                    display_history = _get_display_history_from_message()
                    yield display_history + incremental_history

        except Exception as ex:
            raise ValueError(ex)

        if not response:
            # The topic ends
            print('No one wants to talk anymore!')
            break
        if mentioned_agents_name:
            assert response[-1].name == mentioned_agents_name[0]
            mentioned_agents_name.pop(0)

        if response and response[-1].content == PENDING_USER_INPUT:
            # Terminate group chat and wait for user input
            print('Waiting for user input!')
            break

        # Record the response to messages
        app_global_para['messages'].extend(response)


def test():
    app(cfgs=CFGS)


def app_create(history, now_cfgs):
    now_cfgs = json5.loads(now_cfgs)
    if not history:
        yield history, json.dumps(now_cfgs, indent=4, ensure_ascii=False)
    else:

        if len(history) == 1:
            new_cfgs = {'background': '', 'agents': []}
            # The first time to create grouchat
            exist_cfgs = now_cfgs['agents']
            for cfg in exist_cfgs:
                if 'is_human' in cfg and cfg['is_human']:
                    new_cfgs['agents'].append(cfg)
        else:
            new_cfgs = now_cfgs
        app_global_para['messages_create'].append(Message('user', history[-1][0]))
        response = []
        try:
            agent = init_agent_service_create()
            for response in agent.run(messages=app_global_para['messages_create']):
                display_content = ''
                for rsp in response:
                    if rsp.name == 'role_config':
                        cfg = json5.loads(rsp.content)
                        old_pos = -1
                        for i, x in enumerate(new_cfgs['agents']):
                            if x['name'] == cfg['name']:
                                old_pos = i
                                break
                        if old_pos > -1:
                            new_cfgs['agents'][old_pos] = cfg
                        else:
                            new_cfgs['agents'].append(cfg)

                        display_content += f'\n\n{cfg["name"]}: {cfg["description"]}\n{cfg["instructions"]}'
                    elif rsp.name == 'background':
                        new_cfgs['background'] = rsp.content
                        display_content += f'\n群聊背景:{rsp.content}'
                    else:
                        display_content += f'\n{rsp.content}'

                history[-1][1] = display_content.strip()
                yield history, json.dumps(new_cfgs, indent=4, ensure_ascii=False)
        except Exception as ex:
            raise ValueError(ex)

        app_global_para['messages_create'].extend(response)


def _get_display_history_from_message():
    # Get display history from messages
    display_history = []
    for msg in app_global_para['messages']:
        if isinstance(msg.content, list):
            content = '\n'.join([x.text if x.text else '' for x in msg.content]).strip()
        else:
            content = msg.content.strip()
        function_display = ''
        if msg.function_call:
            function_display = f'\nCall Function: {str(msg.function_call)}'
        content = f'{msg.name}: {content}{function_display}'
        display_history.append((content, None) if msg.name == 'user' else (None, content))
    return display_history


def get_name_of_current_user(cfgs):
    for agent in cfgs['agents']:
        if 'is_human' in agent and agent['is_human']:
            return agent['name']
    return 'user'


def add_text(text, cfgs):
    app_global_para['user_interrupt'] = True
    content = [ContentItem(text=text)]
    if app_global_para['uploaded_file'] and app_global_para['is_first_upload']:
        app_global_para['is_first_upload'] = False  # only send file when first upload
        content.append(ContentItem(file=app_global_para['uploaded_file']))
    app_global_para['messages'].append(
        Message('user', content=content, name=get_name_of_current_user(json5.loads(cfgs))))

    return _get_display_history_from_message(), None


def chat_clear():
    app_global_para['messages'] = []
    return None


def chat_clear_create():
    app_global_para['messages_create'] = []
    return None, None


def add_file(file):
    app_global_para['uploaded_file'] = file.name
    app_global_para['is_first_upload'] = True
    return file.name


def add_text_create(history, text):
    history = history + [(text, None)]
    return history, gr.update(value='', interactive=False)


with gr.Blocks(theme='soft') as demo:
    display_config = gr.Textbox(
        label=  # noqa
        'Current GroupChat: (If editing, please maintain this JSON format)',
        value=json.dumps(CFGS, indent=4, ensure_ascii=False),
        interactive=True)

    with gr.Tab('Chat', elem_id='chat-tab'):
        with gr.Column():
            chatbot = gr.Chatbot(
                [],
                elem_id='chatbot',
                height=750,
                show_copy_button=True,
            )
            with gr.Row():
                with gr.Column(scale=3, min_width=0):
                    auto_speak_button = gr.Button('Randomly select an agent to speak first')
                    auto_speak_button.click(app, display_config, chatbot)
                with gr.Column(scale=10):
                    chat_txt = gr.Textbox(
                        show_label=False,
                        placeholder='Chat with Qwen...',
                        container=False,
                    )
                with gr.Column(scale=1, min_width=0):
                    chat_clr_bt = gr.Button('Clear')

            chat_txt.submit(add_text, [chat_txt, display_config], [chatbot, chat_txt],
                            queue=False).then(app, display_config, chatbot)

            chat_clr_bt.click(chat_clear, None, [chatbot], queue=False)

        demo.load(chat_clear, None, [chatbot], queue=False)

    with gr.Tab('Create', elem_id='chat-tab'):
        with gr.Column(scale=9, min_width=0):
            chatbot = gr.Chatbot(
                [],
                elem_id='chatbot0',
                height=750,
                show_copy_button=True,
            )
            with gr.Row():
                with gr.Column(scale=13):
                    chat_txt = gr.Textbox(
                        show_label=False,
                        placeholder='Chat with Qwen...',
                        container=False,
                    )
                with gr.Column(scale=1, min_width=0):
                    chat_clr_bt = gr.Button('Clear')

            txt_msg = chat_txt.submit(add_text_create, [chatbot, chat_txt], [chatbot, chat_txt],
                                      queue=False).then(app_create, [chatbot, display_config],
                                                        [chatbot, display_config])
            txt_msg.then(lambda: gr.update(interactive=True), None, [chat_txt], queue=False)

            chat_clr_bt.click(chat_clear_create, None, [chatbot, chat_txt], queue=False)
        demo.load(chat_clear_create, None, [chatbot, chat_txt], queue=False)

if __name__ == '__main__':
    demo.queue().launch()

多图输出:小塘为本人,其他均为Agent

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

附录

1、agent源码

概述:定义Agent基类以及其实现和使用方法。

(1)init:初始化实例

  • function_list: 可选参数,接收一个包含工具名称、配置字典或工具对象的列表。这些工具用于在Agent内执行各种任务。
  • llm: 可选参数,可以是字典(指定LLM的配置)或已实例化的LLM模型对象。如果是字典,则使用 get_chat_model 方法将其转换成模型实例。
  • system_message: 定义在LLM对话中使用的系统默认消息。
  • name 和 description: 分别代表代理的名称和描述,有助于在多Agent环境中识别和描述Agent的用途。

(2)方法 run:run: 这个方法接收一系列消息,并调用 _run 方法(抽象方法,需要在子类中实现)来生成响应。

  • 首先对输入消息进行深拷贝,并确定返回消息的类型(字典还是消息对象)。
  • 检查输入消息的语言并调整语言参数,以确保正确的语言环境。
  • 在生成响应时,将每个消息的 name 属性设置为代理的名称(如果存在)。

(3)抽象方法 _run:_run: 是一个抽象方法,要求所有继承自 Agent 的子类必须实现此方法来定义如何处理消息和生成响应。

(4)方法 _call_llm:_call_llm: 这个方法用于调用语言学习模型来处理消息。

  • 在调用LLM之前,会将系统消息作为首条消息插入,或者将其添加到第一条消息的内容中。
  • 使用 merge_generate_cfgs 方法来合并生成配置,以调整LLM的响应。

(5)方法 _call_tool:_call_tool: 用于调用特定的工具来处理特定的任务。

  • 检查工具名称是否已注册,若未注册,则返回错误。
  • 尝试调用工具并捕获任何异常,以便记录和处理错误。

(6)方法 _init_tool:_init_tool: 初始化和注册传入的工具。

  • 检查工具是否已在工具注册表(TOOL_REGISTRY)中,如果没有,则抛出异常。
  • 如果工具已经存在于 function_map 中,则发出警告并使用最新的工具实例替换旧的。

(7)方法 _detect_tool:_detect_tool: 用于检测消息是否包含工具调用的请求。

  • 解析消息中的函数调用信息,确定是否需要执行工具调用,并提取工具名称和参数。

完整代码如下:

import copy
import json
import traceback
from abc import ABC, abstractmethod
from typing import Dict, Iterator, List, Optional, Tuple, Union

from qwen_agent.llm import get_chat_model
from qwen_agent.llm.base import BaseChatModel
from qwen_agent.llm.schema import CONTENT, DEFAULT_SYSTEM_MESSAGE, ROLE, SYSTEM, ContentItem, Message
from qwen_agent.log import logger
from qwen_agent.tools import TOOL_REGISTRY, BaseTool
from qwen_agent.utils.utils import has_chinese_messages, merge_generate_cfgs


class Agent(ABC):
    """A base class for Agent.

    An agent can receive messages and provide response by LLM or Tools.
    Different agents have distinct workflows for processing messages and generating responses in the `_run` method.
    """

    def __init__(self,
                 function_list: Optional[List[Union[str, Dict, BaseTool]]] = None,
                 llm: Optional[Union[Dict, BaseChatModel]] = None,
                 system_message: Optional[str] = DEFAULT_SYSTEM_MESSAGE,
                 name: Optional[str] = None,
                 description: Optional[str] = None,
                 **kwargs):
        """Initialization the agent.

        Args:
            function_list: One list of tool name, tool configuration or Tool object,
              such as 'code_interpreter', {'name': 'code_interpreter', 'timeout': 10}, or CodeInterpreter().
            llm: The LLM model configuration or LLM model object.
              Set the configuration as {'model': '', 'api_key': '', 'model_server': ''}.
            system_message: The specified system message for LLM chat.
            name: The name of this agent.
            description: The description of this agent, which will be used for multi_agent.
        """
        if isinstance(llm, dict):
            self.llm = get_chat_model(llm)
        else:
            self.llm = llm
        self.extra_generate_cfg: dict = {}

        self.function_map = {}
        if function_list:
            for tool in function_list:
                self._init_tool(tool)

        self.system_message = system_message
        self.name = name
        self.description = description

    def run(self, messages: List[Union[Dict, Message]],
            **kwargs) -> Union[Iterator[List[Message]], Iterator[List[Dict]]]:
        """Return one response generator based on the received messages.

        This method performs a uniform type conversion for the inputted messages,
        and calls the _run method to generate a reply.

        Args:
            messages: A list of messages.

        Yields:
            The response generator.
        """
        messages = copy.deepcopy(messages)
        _return_message_type = 'dict'
        new_messages = []
        # Only return dict when all input messages are dict
        if not messages:
            _return_message_type = 'message'
        for msg in messages:
            if isinstance(msg, dict):
                new_messages.append(Message(**msg))
            else:
                new_messages.append(msg)
                _return_message_type = 'message'

        if 'lang' not in kwargs:
            if has_chinese_messages(new_messages):
                kwargs['lang'] = 'zh'
            else:
                kwargs['lang'] = 'en'

        for rsp in self._run(messages=new_messages, **kwargs):
            for i in range(len(rsp)):
                if not rsp[i].name and self.name:
                    rsp[i].name = self.name
            if _return_message_type == 'message':
                yield [Message(**x) if isinstance(x, dict) else x for x in rsp]
            else:
                yield [x.model_dump() if not isinstance(x, dict) else x for x in rsp]

    @abstractmethod
    def _run(self, messages: List[Message], lang: str = 'en', **kwargs) -> Iterator[List[Message]]:
        """Return one response generator based on the received messages.

        The workflow for an agent to generate a reply.
        Each agent subclass needs to implement this method.

        Args:
            messages: A list of messages.
            lang: Language, which will be used to select the language of the prompt
              during the agent's execution process.

        Yields:
            The response generator.
        """
        raise NotImplementedError

    def _call_llm(
        self,
        messages: List[Message],
        functions: Optional[List[Dict]] = None,
        stream: bool = True,
        extra_generate_cfg: Optional[dict] = None,
    ) -> Iterator[List[Message]]:
        """The interface of calling LLM for the agent.

        We prepend the system_message of this agent to the messages, and call LLM.

        Args:
            messages: A list of messages.
            functions: The list of functions provided to LLM.
            stream: LLM streaming output or non-streaming output.
              For consistency, we default to using streaming output across all agents.

        Yields:
            The response generator of LLM.
        """
        messages = copy.deepcopy(messages)
        if messages[0][ROLE] != SYSTEM:
            messages.insert(0, Message(role=SYSTEM, content=self.system_message))
        elif isinstance(messages[0][CONTENT], str):
            messages[0][CONTENT] = self.system_message + messages[0][CONTENT]
        else:
            assert isinstance(messages[0][CONTENT], list)
            messages[0][CONTENT] = [ContentItem(text=self.system_message)] + messages[0][CONTENT]
        return self.llm.chat(messages=messages,
                             functions=functions,
                             stream=stream,
                             extra_generate_cfg=merge_generate_cfgs(
                                 base_generate_cfg=self.extra_generate_cfg,
                                 new_generate_cfg=extra_generate_cfg,
                             ))

    def _call_tool(self, tool_name: str, tool_args: Union[str, dict] = '{}', **kwargs) -> str:
        """The interface of calling tools for the agent.

        Args:
            tool_name: The name of one tool.
            tool_args: Model generated or user given tool parameters.

        Returns:
            The output of tools.
        """
        if tool_name not in self.function_map:
            return f'Tool {tool_name} does not exists.'
        tool = self.function_map[tool_name]
        try:
            tool_result = tool.call(tool_args, **kwargs)
        except Exception as ex:
            exception_type = type(ex).__name__
            exception_message = str(ex)
            traceback_info = ''.join(traceback.format_tb(ex.__traceback__))
            error_message = f'An error occurred when calling tool `{tool_name}`:\n' \
                            f'{exception_type}: {exception_message}\n' \
                            f'Traceback:\n{traceback_info}'
            logger.warning(error_message)
            return error_message

        if isinstance(tool_result, str):
            return tool_result
        else:
            return json.dumps(tool_result, ensure_ascii=False, indent=4)

    def _init_tool(self, tool: Union[str, Dict, BaseTool]):
        if isinstance(tool, BaseTool):
            tool_name = tool.name
            if tool_name in self.function_map:
                logger.warning(f'Repeatedly adding tool {tool_name}, will use the newest tool in function list')
            self.function_map[tool_name] = tool
        else:
            if isinstance(tool, dict):
                tool_name = tool['name']
                tool_cfg = tool
            else:
                tool_name = tool
                tool_cfg = None
            if tool_name not in TOOL_REGISTRY:
                raise ValueError(f'Tool {tool_name} is not registered.')

            if tool_name in self.function_map:
                logger.warning(f'Repeatedly adding tool {tool_name}, will use the newest tool in function list')
            self.function_map[tool_name] = TOOL_REGISTRY[tool_name](tool_cfg)

    def _detect_tool(self, message: Message) -> Tuple[bool, str, str, str]:
        """A built-in tool call detection for func_call format message.

        Args:
            message: one message generated by LLM.

        Returns:
            Need to call tool or not, tool name, tool args, text replies.
        """
        func_name = None
        func_args = None

        if message.function_call:
            func_call = message.function_call
            func_name = func_call.name
            func_args = func_call.arguments
        text = message.content
        if not text:
            text = ''

        return (func_name is not None), func_name, func_args, text

2、router源码

实现了一个高级的路由器功能,用于管理和协调多个智能助手代理(agents),以处理复杂的用户请求。这是通过继承和扩展了一个假想的 qwen_agent 库来完成的,其中包括多个模块和类,专门为建立智能对话系统而设计。下面我将详细解释这段代码的关键部分及其功能。

类定义:Router

Router 类继承自 Assistant 和 MultiAgentHub,旨在作为多个代理的中心节点,处理消息并根据需要将任务委托给其他代理。

构造函数 (init) 参数:

  • function_list:可选,定义路由器可以执行的功能列表。
  • llm:可选,定义了语言模型的配置或实例。
  • files:可选,定义了与路由器相关的文件列表。
  • name:可选,路由器的名称。
  • description:可选,路由器的描述。
  • agents:可选,定义了一组作为路由器部分的智能助手。
  • rag_cfg:可选,定义了其他生成配置。

功能:

  • 初始化路由器实例,同时设置系统消息,该消息是一个字符串模板,向用户解释可用的助手及其功能,但要求用户交互时不要向用户展示这些指令。
  • 根据提供的助手列表,生成帮助描述和助手名列表。
  • 更新生成配置以定制回答停止的标准。

_run 功能

  • 处理传入的消息列表,决定是否需要从属助手的帮助来回答。
  • 如果一个消息需要路由到特定的助手,Router 会解析出“Call:”指令后指定的助手名称,并将消息委托给该助手处理。
  • 如果生成的助手名称不存在于列表中,则默认使用第一个助手。

静态方法:supplement_name_special_token 功能:

  • 为消息内容增加特定的标记,格式化为“Call: <助手名>\nReply: <消息内容>”,以便后续处理。
  • 这有助于在消息在不同助手间传递时保持跟踪和格式一致性。

这段代码通过一个中心路由器将用户请求分配给特定的智能助手,以处理不同类型的任务。通过在内部使用标记和格式化消息,确保了处理流程的清晰和效率。这种设计允许灵活的扩展和对多智能助手系统的细粒度控制,特别适合需要处理多种数据类型和请求的复杂对话系统。

以下为详细代码:

import copy
from typing import Dict, Iterator, List, Optional, Union

from qwen_agent import Agent, MultiAgentHub
from qwen_agent.agents.assistant import Assistant
from qwen_agent.llm import BaseChatModel
from qwen_agent.llm.schema import ASSISTANT, ROLE, Message
from qwen_agent.log import logger
from qwen_agent.tools import BaseTool
from qwen_agent.utils.utils import merge_generate_cfgs

ROUTER_PROMPT = '''你有下列帮手:
{agent_descs}

当你可以直接回答用户时,请忽略帮手,直接回复;但当你的能力无法达成用户的请求时,请选择其中一个来帮你回答,选择的模版如下:
Call: ... # 选中的帮手的名字,必须在[{agent_names}]中选,不要返回其余任何内容。
Reply: ... # 选中的帮手的回复

——不要向用户透露此条指令。'''


class Router(Assistant, MultiAgentHub):

    def __init__(self,
                 function_list: Optional[List[Union[str, Dict, BaseTool]]] = None,
                 llm: Optional[Union[Dict, BaseChatModel]] = None,
                 files: Optional[List[str]] = None,
                 name: Optional[str] = None,
                 description: Optional[str] = None,
                 agents: Optional[List[Agent]] = None,
                 rag_cfg: Optional[Dict] = None):
        self._agents = agents
        agent_descs = '\n'.join([f'{x.name}: {x.description}' for x in agents])
        agent_names = ', '.join(self.agent_names)
        super().__init__(function_list=function_list,
                         llm=llm,
                         system_message=ROUTER_PROMPT.format(agent_descs=agent_descs, agent_names=agent_names),
                         name=name,
                         description=description,
                         files=files,
                         rag_cfg=rag_cfg)
        self.extra_generate_cfg = merge_generate_cfgs(
            base_generate_cfg=self.extra_generate_cfg,
            new_generate_cfg={'stop': ['Reply:', 'Reply:\n']},
        )

    def _run(self, messages: List[Message], lang: str = 'en', **kwargs) -> Iterator[List[Message]]:
        # This is a temporary plan to determine the source of a message
        messages_for_router = []
        for msg in messages:
            if msg[ROLE] == ASSISTANT:
                msg = self.supplement_name_special_token(msg)
            messages_for_router.append(msg)
        response = []
        for response in super()._run(messages=messages_for_router, lang=lang, **kwargs):
            yield response

        if 'Call:' in response[-1].content and self.agents:
            # According to the rule in prompt to selected agent
            selected_agent_name = response[-1].content.split('Call:')[-1].strip().split('\n')[0].strip()
            logger.info(f'Need help from {selected_agent_name}')
            if selected_agent_name not in self.agent_names:
                # If the model generates a non-existent agent, the first agent will be used by default.
                selected_agent_name = self.agent_names[0]
            selected_agent = self.agents[self.agent_names.index(selected_agent_name)]
            for response in selected_agent.run(messages=messages, lang=lang, **kwargs):
                for i in range(len(response)):
                    if response[i].role == ASSISTANT:
                        response[i].name = selected_agent_name
                # This new response will overwrite the above 'Call: xxx' message
                yield response

    @staticmethod
    def supplement_name_special_token(message: Message) -> Message:
        message = copy.deepcopy(message)
        if not message.name:
            return message

        if isinstance(message['content'], str):
            message['content'] = 'Call: ' + message['name'] + '\nReply:' + message['content']
            return message
        assert isinstance(message['content'], list)
        for i, item in enumerate(message['content']):
            for k, v in item.model_dump().items():
                if k == 'text':
                    message['content'][i][k] = 'Call: ' + message['name'] + '\nReply:' + message['content'][i][k]
                    break
        return message

参考文章:
Qwen-Agent : GitHub官网.
Qwen-Agent 文档


总结

会调用工具的Agent太炫酷啦。🐏

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐