Skip to content

如何从工具内部流式传输数据

前提条件

本指南假设你熟悉以下内容:

如果你的图调用了使用LLM或其他流式API的工具,你可能希望在工具执行过程中显示部分结果,特别是在工具运行时间较长的情况下。

  1. 要从工具内部流式传输**任意**数据,可以使用stream_mode="custom"get_stream_writer()

    from langgraph.config import get_stream_writer
    
    def tool(tool_arg: str):
        writer = get_stream_writer()
        for chunk in custom_data_stream():
            # 流式传输任意数据
            writer(chunk)
        ...
    
    for chunk in graph.stream(
        inputs,
        stream_mode="custom"
    ):
        print(chunk)
    
  2. 要流式传输工具调用LLM生成的LLM标记,可以使用stream_mode="messages"

    from langgraph.graph import StateGraph, MessagesState
    from langchain_openai import ChatOpenAI
    
    model = ChatOpenAI()
    
    def tool(tool_arg: str):
        model.invoke(tool_arg)
        ...
    
    def call_tools(state: MessagesState):
        tool_call = get_tool_call(state)
        tool_result = tool(**tool_call["args"])
        ...
    
    graph = (
        StateGraph(MessagesState)
        .add_node(call_tools)
        ...
        .compile()
    
    for msg, metadata in graph.stream(
        inputs,
        stream_mode="messages"
    ):
        print(msg)
    

不使用LangChain

如果你需要从工具内部流式传输数据**而不使用LangChain**,你可以使用stream_mode="custom"。查看下面的示例以了解更多信息。

Python < 3.11中的异步

当使用Python < 3.11与异步代码时,请确保在调用模型时手动传递RunnableConfig,如下所示:model.ainvoke(..., config)。 流式传输方法使用一个作为回调传递的流式追踪器来收集你嵌套代码中的所有事件。在3.11及以上版本中,这会通过contextvars自动处理;在3.11之前,asyncio的任务缺乏适当的contextvar支持,这意味着回调将仅在你手动传递配置时传播。我们在下面的call_model函数中这样做。

设置

首先,让我们安装所需的包并设置API密钥

%%capture --no-stderr
%pip install -U langgraph langchain-openai

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
OPENAI_API_KEY:  ········

为LangGraph开发设置LangSmith

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅此处

流式传输自定义数据

本指南将使用一个预构建的ReAct代理

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer


@tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()

    # this can be replaced with any actual streaming logic that you might have
    items = ["books", "penciles", "pictures"]
    for chunk in items:
        writer({"custom_tool_data": chunk})

    return ", ".join(items)


llm = ChatOpenAI(model_name="gpt-4o-mini")
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)

API Reference: tool | ChatOpenAI | create_react_agent

现在让我们用一个需要调用工具的输入来调用我们的代理:

inputs = {
    "messages": [  
        {"role": "user", "content": "what items are in the office?"}
    ]
}
async for chunk in agent.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk)
{'custom_tool_data': 'books'}
{'custom_tool_data': 'penciles'}
{'custom_tool_data': 'pictures'}

流式传输LLM令牌

from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import RunnableConfig


@tool
async def get_items(
    place: str,
    # Manually accept config (needed for Python <= 3.10)
    config: RunnableConfig,
) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    # Attention: when using async, you should be invoking the LLM using ainvoke!
    # If you fail to do so, streaming will NOT work.
    response = await llm.ainvoke(
        [
            {
                "role": "user",
                "content": (
                    f"Can you tell me what kind of items i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. And include a brief description of each item."
                ),
            }
        ],
        config,
    )
    return response.content


tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)

API Reference: AIMessageChunk | RunnableConfig

inputs = {
    "messages": [  
        {"role": "user", "content": "what items are in the bedroom?"}
    ]
}
async for msg, metadata in agent.astream(
    inputs,
    stream_mode="messages",
):
    if (
        isinstance(msg, AIMessageChunk)
        and msg.content
        # Stream all messages from the tool node
        and metadata["langgraph_node"] == "tools"
    ):
        print(msg.content, end="|", flush=True)
Certainly|!| Here| are| three| items| you| might| find| in| a| bedroom|:

|1|.| **|Bed|**|:| The| central| piece| of| furniture| in| a| bedroom|,| typically| consisting| of| a| mattress| supported| by| a| frame|.| It| is| designed| for| sleeping| and| can| vary| in| size| from| twin| to| king|.| Beds| often| have| bedding|,| including| sheets|,| pillows|,| and| comfort|ers|,| to| enhance| comfort|.

|2|.| **|D|resser|**|:| A| piece| of| furniture| with| drawers| used| for| storing| clothing| and| personal| items|.| Dress|ers| often| have| a| flat| surface| on| top|,| which| can| be| used| for| decorative| items|,| a| mirror|,| or| personal| accessories|.| They| help| keep| the| bedroom| organized| and| clutter|-free|.

|3|.| **|Night|stand|**|:| A| small| table| or| cabinet| placed| beside| the| bed|,| used| for| holding| items| such| as| a| lamp|,| alarm| clock|,| books|,| or| personal| items|.| Night|stands| provide| convenience| for| easy| access| to| essentials| during| the| night|,| adding| functionality| and| style| to| the| bedroom| decor|.|

无LangChain示例

您也可以在工具调用过程中**不使用LangChain**的情况下流式传输数据。下面的例子演示了如何为一个只有一个工具执行节点的图进行操作。我们将留给读者一个练习,即不使用LangChain从头实现ReAct代理IMPLEMENT_REACT_AGENT_FROM_SCRATCH

import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# this is our tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# this is the tool-calling graph node
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)  
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)

API Reference: StateGraph | START

现在让我们用一条包含工具调用的AI消息来调用我们的图:

inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)
Sure|!| Here| are| three| common| items| you| might| find| in| a| bedroom|:

|1|.| **|Bed|**|:| The| focal| point| of| the| bedroom|,| a| bed| typically| consists| of| a| mattress| resting| on| a| frame|,| and| it| may| include| pillows| and| bedding|.| It| provides| a| comfortable| place| for| sleeping| and| resting|.

|2|.| **|D|resser|**|:| A| piece| of| furniture| with| multiple| drawers|,| a| dresser| is| used| for| storing| clothes|,| accessories|,| and| personal| items|.| It| often| has| a| flat| surface| that| may| be| used| to| display| decorative| items| or| a| mirror|.

|3|.| **|Night|stand|**|:| Also| known| as| a| bedside| table|,| a| night|stand| is| placed| next| to| the| bed| and| typically| holds| items| like| lamps|,| books|,| alarm| clocks|,| and| personal| belongings| for| convenience| during| the| night|.

|These| items| contribute| to| the| functionality| and| comfort| of| the| bedroom| environment|.|

Comments