如何在工具内部流式传输数据¶
如果你的图调用了使用大语言模型(LLM)或任何其他流式 API 的工具,你可能希望在工具执行期间展示部分结果,特别是当工具运行时间较长时。
-
要从工具内部流式传输**任意**数据,你可以使用
stream_mode="custom"
和get_stream_writer()
: -
要流式传输调用大语言模型的工具生成的大语言模型标记,你可以使用
stream_mode="messages"
:from langgraph.graph import StateGraph, MessagesState from langchain_openai import ChatOpenAI model = ChatOpenAI() def tool(tool_arg: str): model.invoke(tool_arg) ... def call_tools(state: MessagesState): tool_call = get_tool_call(state) tool_result = tool(**tool_call["args"]) ... graph = ( StateGraph(MessagesState) .add_node(call_tools) ... .compile() for msg, metadata in graph.stream( inputs, stream_mode="messages" ): print(msg)
不使用 LangChain 的情况
如果你需要在**不使用 LangChain** 的情况下从工具内部流式传输数据,可以使用 stream_mode="custom"
。查看下面的示例以了解更多信息。
Python 版本低于 3.11 时的异步操作
当在 Python 版本低于 3.11 的环境中使用异步代码时,请确保在调用聊天模型时手动传递 RunnableConfig
,如下所示:model.ainvoke(..., config)
。
stream
方法通过作为回调传递的流式跟踪器收集嵌套代码中的所有事件。在 Python 3.11 及更高版本中,这是通过 contextvars 自动处理的;在 Python 3.11 之前,asyncio 的任务缺乏对 contextvar
的适当支持,这意味着只有在你手动传递配置时,回调才会传播。我们在下面的 call_model
函数中进行了这样的操作。
环境设置¶
首先,让我们安装所需的包并设置 API 密钥
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
为 LangGraph 开发设置 LangSmith
注册 LangSmith,以便快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的大语言模型应用程序 — 点击 此处 了解更多关于如何开始使用的信息。
流式传输自定义数据¶
在本指南中,我们将使用一个预构建的 ReAct 代理:
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer
@tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
# this can be replaced with any actual streaming logic that you might have
items = ["books", "penciles", "pictures"]
for chunk in items:
writer({"custom_tool_data": chunk})
return ", ".join(items)
llm = ChatOpenAI(model_name="gpt-4o-mini")
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)
API Reference: tool
现在,让我们使用一个需要调用工具的输入来调用我们的智能体:
inputs = {
"messages": [
{"role": "user", "content": "what items are in the office?"}
]
}
async for chunk in agent.astream(
inputs,
stream_mode="custom",
):
print(chunk)
流式传输大语言模型(LLM)令牌¶
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import RunnableConfig
@tool
async def get_items(
place: str,
# Manually accept config (needed for Python <= 3.10)
config: RunnableConfig,
) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
# Attention: when using async, you should be invoking the LLM using ainvoke!
# If you fail to do so, streaming will NOT work.
response = await llm.ainvoke(
[
{
"role": "user",
"content": (
f"Can you tell me what kind of items i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. And include a brief description of each item."
),
}
],
config,
)
return response.content
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)
API Reference: AIMessageChunk | RunnableConfig
inputs = {
"messages": [
{"role": "user", "content": "what items are in the bedroom?"}
]
}
async for msg, metadata in agent.astream(
inputs,
stream_mode="messages",
):
if (
isinstance(msg, AIMessageChunk)
and msg.content
# Stream all messages from the tool node
and metadata["langgraph_node"] == "tools"
):
print(msg.content, end="|", flush=True)
Certainly|!| Here| are| three| items| you| might| find| in| a| bedroom|:
|1|.| **|Bed|**|:| The| central| piece| of| furniture| in| a| bedroom|,| typically| consisting| of| a| mattress| supported| by| a| frame|.| It| is| designed| for| sleeping| and| can| vary| in| size| from| twin| to| king|.| Beds| often| have| bedding|,| including| sheets|,| pillows|,| and| comfort|ers|,| to| enhance| comfort|.
|2|.| **|D|resser|**|:| A| piece| of| furniture| with| drawers| used| for| storing| clothing| and| personal| items|.| Dress|ers| often| have| a| flat| surface| on| top|,| which| can| be| used| for| decorative| items|,| a| mirror|,| or| personal| accessories|.| They| help| keep| the| bedroom| organized| and| clutter|-free|.
|3|.| **|Night|stand|**|:| A| small| table| or| cabinet| placed| beside| the| bed|,| used| for| holding| items| such| as| a| lamp|,| alarm| clock|,| books|,| or| personal| items|.| Night|stands| provide| convenience| for| easy| access| to| essentials| during| the| night|,| adding| functionality| and| style| to| the| bedroom| decor|.|
不使用 LangChain 的示例¶
你也可以在工具调用过程中流式传输数据,而无需使用 LangChain。下面的示例展示了如何为一个仅包含工具执行节点的图形实现此功能。我们将把不使用 LangChain 从头实现 ReAct 代理 留给读者作为练习。
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
# this is our tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role": "user",
"content": (
"Can you tell me what kind of items "
f"i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. "
"And include a brief description of each item."
),
}
],
):
response += msg_chunk["content"]
writer(msg_chunk)
return response
class State(TypedDict):
messages: Annotated[list[dict], operator.add]
# this is the tool-calling graph node
async def call_tool(state: State):
ai_message = state["messages"][-1]
tool_call = ai_message["tool_calls"][-1]
function_name = tool_call["function"]["name"]
if function_name != "get_items":
raise ValueError(f"Tool {function_name} not supported")
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await get_items(**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge(START, "call_tool")
.compile()
)
现在,让我们使用包含工具调用的 AI 消息来调用我们的图:
inputs = {
"messages": [
{
"content": None,
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"arguments": '{"place":"bedroom"}',
"name": "get_items",
},
"type": "function",
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode="custom",
):
print(chunk["content"], end="|", flush=True)
Sure|!| Here| are| three| common| items| you| might| find| in| a| bedroom|:
|1|.| **|Bed|**|:| The| focal| point| of| the| bedroom|,| a| bed| typically| consists| of| a| mattress| resting| on| a| frame|,| and| it| may| include| pillows| and| bedding|.| It| provides| a| comfortable| place| for| sleeping| and| resting|.
|2|.| **|D|resser|**|:| A| piece| of| furniture| with| multiple| drawers|,| a| dresser| is| used| for| storing| clothes|,| accessories|,| and| personal| items|.| It| often| has| a| flat| surface| that| may| be| used| to| display| decorative| items| or| a| mirror|.
|3|.| **|Night|stand|**|:| Also| known| as| a| bedside| table|,| a| night|stand| is| placed| next| to| the| bed| and| typically| holds| items| like| lamps|,| books|,| alarm| clocks|,| and| personal| belongings| for| convenience| during| the| night|.
|These| items| contribute| to| the| functionality| and| comfort| of| the| bedroom| environment|.|