构建LangChain应用程序的示例代码:11、构建一个能够查询Postgres数据库并运行Python代码来分析检索到的数据的代理
现在我们准备定义我们的应用程序逻辑。核心元素是代理状态、节点和边。由于我们的代码解释器可以返回像base64编码的图像这样的结果,我们不想将它们传回模型,我们将创建一个自定义的工具消息,允许我们在不将它们传回模型的情况下传递原始工具输出。自定义的工具消息,允许我们传递原始工具输出(以及用于传回模型的字符串内容)。"""首先,我们将定义一个用于调用我们模型的节点。我们需要确保将我们的工具绑定到模型上
构建数据分析师代理:使用LangGraph和Azure Container Apps动态会话
在这个示例中,我们将构建一个能够查询Postgres数据库并运行Python代码来分析检索到的数据的代理。我们将使用LangGraph进行代理协调,并使用Azure Container Apps动态会话来安全地执行Python代码。
注意:构建与SQL数据库交互的大型语言模型(LLM)系统需要执行模型生成的SQL查询。这样做存在固有风险。确保您的数据库连接权限始终尽可能地限制在代理需求的范围内。这将减轻但不能完全消除构建模型驱动系统的风险。有关一般安全最佳实践的更多信息,请参见我们的安全指南。
安装
让我们通过安装Python依赖项并设置我们的OpenAI凭据、Azure Container Apps会话池端点和我们的SQL数据库连接字符串来开始设置。
安装依赖项
%pip install -qU langgraph langchain-azure-dynamic-sessions langchain-openai langchain-community pandas matplotlib
设置凭据
默认情况下,此演示使用:
- Azure OpenAI用于模型:Azure OpenAI 创建资源
- Azure PostgreSQL用于数据库:Azure CLI 创建PostgreSQL服务器
- Azure Container Apps动态会话用于代码执行:Azure Container Apps 代码解释器会话
此LangGraph架构也可以与任何其他调用工具的LLM和任何SQL数据库一起使用。
import getpass
import os
os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass("Azure OpenAI API key")
os.environ["AZURE_OPENAI_ENDPOINT"] = getpass.getpass("Azure OpenAI endpoint")
AZURE_OPENAI_DEPLOYMENT_NAME = getpass.getpass("Azure OpenAI deployment name")
SESSIONS_POOL_MANAGEMENT_ENDPOINT = getpass.getpass(
"Azure Container Apps dynamic sessions pool management endpoint"
)
SQL_DB_CONNECTION_STRING = getpass.getpass("PostgreSQL connection string")
导入
import ast
import base64
import io
import json
import operator
from functools import partial
from typing import Annotated, List, Literal, Optional, Sequence, TypedDict
import pandas as pd
from IPython.display import display
from langchain_azure_dynamic_sessions import SessionsPythonREPLTool
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnablePassthrough
from langchain_core.tools import tool
from langchain_openai import AzureChatOpenAI
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from matplotlib.pyplot import imshow
from PIL import Image
实例化模型、数据库、代码解释器
我们将使用LangChain SQLDatabase接口连接到我们的数据库并查询它。这适用于任何受SQLAlchemy支持的SQL数据库。
db = SQLDatabase.from_uri(SQL_DB_CONNECTION_STRING)
对于我们的LLM,我们需要确保我们使用的模型支持工具调用。
llm = AzureChatOpenAI(
deployment_name=AZURE_OPENAI_DEPLOYMENT_NAME, openai_api_version="2024-02-01"
)
动态会话工具是我们用于代码执行的工具。
repl = SessionsPythonREPLTool(
pool_management_endpoint=SESSIONS_POOL_MANAGEMENT_ENDPOINT
)
定义图
现在我们准备定义我们的应用程序逻辑。核心元素是代理状态、节点和边。
定义状态
我们将使用一个简单的代理状态,它只是每个节点都可以追加的消息列表:
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
由于我们的代码解释器可以返回像base64编码的图像这样的结果,我们不想将它们传回模型,我们将创建一个自定义的工具消息,允许我们在不将它们传回模型的情况下传递原始工具输出。
class RawToolMessage(ToolMessage):
"""
自定义的工具消息,允许我们传递原始工具输出(以及用于传回模型的字符串内容)。
"""
定义节点
首先,我们将定义一个用于调用我们模型的节点。我们需要确保将我们的工具绑定到模型上,以便它知道要调用它们。我们还将在我们的提示中指定模型可以访问的SQL表的模式,以便它可以编写相关的SQL查询。
我们将使用我们模型的工具调用能力可靠地生成我们的SQL查询和Python代码。为此,我们需要为我们的工具定义模式,以便模型可以使用它们来构建其工具调用。
注意,类名、文档字符串和属性类型和描述在这里至关重要,因为它们实际上被传递到模型中(您可以有效地将它们视为提示的一部分)。
# 查询SQL数据库的工具模式
class create_df_from_sql(BaseModel):
"""执行一个PostgreSQL SELECT语句,并使用结果创建一个带有给定列名的数据帧。"""
# 编写Python代码的工具模式
class python_shell(BaseModel):
"""执行分析已生成的数据帧的Python代码。确保打印任何重要的结果。"""
system_prompt = f"""你是一个PostgreSQL和Python的专家。你可以访问一个PostgreSQL数据库,数据库中包含以下表格:
{db.table_info}
给定一个与数据库中数据相关的用户问题,首先使用create_df_from_sql工具从表中获取相关数据作为数据帧。然后使用python_shell进行任何必要的分析以回答用户问题。"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("placeholder", "{messages}"),
]
)
def call_model(state: AgentState) -> dict:
"""使用传入的工具调用模型。"""
messages = []
现在我们可以定义一个节点来执行模型生成的任何SQL查询。注意,在我们运行查询之后,我们将结果转换为Pandas DataFrames — 这些将在下一步上传到代码解释器工具,以便它可以使用检索到的数据。
def execute_sql_query(state: AgentState) -> dict:
"""执行最新的SQL查询。"""
messages = []
现在我们需要一个节点来执行任何模型生成的Python代码。关键步骤如下:
- 将查询数据上传到代码解释器
- 执行模型生成的代码
- 解析结果,以便图像被显示而不是传入到未来的模型调用中
要将查询数据上传到模型,我们可以将我们通过执行SQL查询生成的DataFrames作为CSV上传到我们的代码解释器。
def _upload_dfs_to_repl(state: AgentState) -> str:
"""
将生成的dfs上传到代码解释器并返回加载它们的代码。
def _repl_result_to_msg_content(repl_result: dict) -> str:
"""
通过在工具消息内容中包含它们来显示图像。
content = {}
for k, v in repl_result.items():
# 任何图像结果都以以下形式返回:
# {"type": "image", "base64_data": "..."}
if isinstance(repl_result[k], dict) and repl_result[k]["type"] == "image":
# 解码并显示图像
base64_str = repl_result[k]["base64_data"]
img = Image.open(io.BytesIO(base64.decodebytes(bytes(base64_str, "utf-8"))))
display(img)
else:
content[k] = repl_result[k]
return json.dumps(content, indent=2)
def execute_python(state: AgentState) -> dict:
"""
执行最新生成的Python代码。
"""
messages = []
定义边
现在我们将所有部分组合成一个图。
def should_continue(state: AgentState) -> str:
"""
如果上一个周期生成了任何工具消息,这意味着我们需要再次调用模型来解释最新结果。
"""
return "execute_sql_query" if state["messages"][-1].tool_calls else END
workflow = StateGraph(AgentState)
workflow.add_node("call_model", call_model)
workflow.add_node("execute_sql_query", execute_sql_query)
workflow.add_node("execute_python", execute_python)
workflow.set_entry_point("call_model")
workflow.add_edge("execute_sql_query", "execute_python")
workflow.add_edge("execute_python", "call_model")
workflow.add_conditional_edges("call_model", should_continue)
app = workflow.compile()
print(app.get_graph().draw_ascii())
测试
用与您连接代理的数据库相关的问题替换这些示例。
output = app.invoke({"messages": [("human", "graph the average latency by model")]})
print(output["messages"][-1].content)
LangSmith Trace: https://smith.langchain.com/public/9c8afcce-0ed1-4fb1-b719-767e6432bd8e/r
output = app.invoke(
{
"messages": [
("human", "what's the relationship between latency and input tokens?")
]
}
)
print(output["messages"][-1].content)
output = app.invoke(
{
"messages": output["messages"] + [("human", "now control for model")]
}
)
print(output["messages"][-1].content)
output = app.invoke(
{
"messages": output["messages"]
+ [("human", "what about latency vs output tokens")]
}
)
print(output["messages"][-1].content)
output = app.invoke(
{
"messages": [
(
"human",
"what's the better explanatory variable for latency: input or output tokens?",
)
]
}
)
print(output["messages"][-1].content)
本文介绍了如何构建一个数据分析师代理,该代理能够查询Postgres数据库并通过Python代码分析数据。通过使用LangGraph进行代理协调和Azure Container Apps动态会话来安全执行代码,我们能够创建一个强大的数据分析工具。该系统还强调了与SQL数据库交互时的安全最佳实践,确保数据库连接权限尽可能地限制在代理需求的范围内。通过定义清晰的工具模式和节点,我们可以构建一个能够自动执行SQL查询和Python代码的复杂工作流,从而实现高效的数据分析和可视化。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)