Skip to content

如何进行流式传输

前提条件

本指南假设你熟悉以下内容:

流式处理对于提升基于大语言模型(LLMs)构建的应用程序的响应能力至关重要。通过在完整响应准备好之前逐步显示输出,流式处理显著改善了用户体验(UX),尤其是在处理大语言模型的延迟问题时。

LangGraph 原生支持流式处理。有几种不同的方式可以从图运行中流式返回输出:

  • "values":在每一步之后发出状态中的所有值。
  • "updates":在每一步之后仅发出节点名称和节点返回的更新。 如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将分别发出。
  • "custom":使用 StreamWriter 从节点内部发出自定义数据。
  • "messages":逐令牌发出大语言模型消息,并附带节点内任何大语言模型调用的元数据。
  • "debug":为每一步发出包含尽可能多信息的调试事件。

你可以使用 graph.stream(..., stream_mode=<stream_mode>) 方法从图中流式输出,例如:

for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
async for chunk in graph.astream(inputs, stream_mode="updates"):
    print(chunk)

你还可以通过向 stream_mode 参数提供一个列表来组合多种流式处理模式:

for chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)
async for chunk in graph.astream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

设置

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
OPENAI_API_KEY:  ········

为 LangGraph 开发设置 LangSmith

注册 LangSmith,以便快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的大语言模型应用程序 — 点击 此处 了解更多关于如何开始使用的信息。

让我们定义一个包含两个节点的简单图:

定义图

from typing import TypedDict
from langgraph.graph import StateGraph, START


class State(TypedDict):
    topic: str
    joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

流式传输状态中的所有值(stream_mode="values")

使用此功能在每一步之后流式传输状态中的**所有值**。

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="values",
):
    print(chunk)
{'topic': 'ice cream'}
{'topic': 'ice cream and cats'}
{'topic': 'ice cream and cats', 'joke': 'This is a joke about ice cream and cats'}

从节点流式传输状态更新(stream_mode="updates")

使用此功能仅流式传输每个步骤后节点返回的**状态更新**。流式输出包括节点名称以及更新内容。

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="updates",
):
    print(chunk)
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

流调试事件(stream_mode="debug")

使用此功能来流式传输**调试事件**,为每个步骤提供尽可能多的信息。其中包括已安排执行的任务的相关信息以及任务执行的结果。

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="debug",
):
    print(chunk)
{'type': 'task', 'timestamp': '2025-01-28T22:06:34.789803+00:00', 'step': 1, 'payload': {'id': 'eb305d74-3460-9510-d516-beed71a63414', 'name': 'refine_topic', 'input': {'topic': 'ice cream'}, 'triggers': ['start:refine_topic']}}
{'type': 'task_result', 'timestamp': '2025-01-28T22:06:34.790013+00:00', 'step': 1, 'payload': {'id': 'eb305d74-3460-9510-d516-beed71a63414', 'name': 'refine_topic', 'error': None, 'result': [('topic', 'ice cream and cats')], 'interrupts': []}}
{'type': 'task', 'timestamp': '2025-01-28T22:06:34.790165+00:00', 'step': 2, 'payload': {'id': '74355cb8-6284-25e0-579f-430493c1bdab', 'name': 'generate_joke', 'input': {'topic': 'ice cream and cats'}, 'triggers': ['refine_topic']}}
{'type': 'task_result', 'timestamp': '2025-01-28T22:06:34.790337+00:00', 'step': 2, 'payload': {'id': '74355cb8-6284-25e0-579f-430493c1bdab', 'name': 'generate_joke', 'error': None, 'result': [('joke', 'This is a joke about ice cream and cats')], 'interrupts': []}}

流式传输大语言模型(LLM)的令牌(stream_mode="messages"

使用此功能可逐令牌流式传输 大语言模型(LLM)消息,同时为节点或任务内的任何大语言模型调用附带元数据。让我们修改上述示例以包含大语言模型调用:

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")


def generate_joke(state: State):
    llm_response = llm.invoke(
        [
            {"role": "user", "content": f"Generate a joke about {state['topic']}"}
        ]
    )
    return {"joke": llm_response.content}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)
Why| did| the| cat| sit| on| the| ice| cream| cone|?

|Because| it| wanted| to| be| a| "|p|urr|-f|ect|"| scoop|!| 🍦|🐱|

metadata
{'langgraph_step': 2,
 'langgraph_node': 'generate_joke',
 'langgraph_triggers': ['refine_topic'],
 'langgraph_path': ('__pregel_pull', 'generate_joke'),
 'langgraph_checkpoint_ns': 'generate_joke:568879bc-8800-2b0d-a5b5-059526a4bebf',
 'checkpoint_ns': 'generate_joke:568879bc-8800-2b0d-a5b5-059526a4bebf',
 'ls_provider': 'openai',
 'ls_model_name': 'gpt-4o-mini',
 'ls_model_type': 'chat',
 'ls_temperature': 0.7}

流式传输自定义数据(stream_mode="custom")

使用此功能通过 StreamWriter 从节点内部流式传输自定义数据。

from langgraph.types import StreamWriter


def generate_joke(state: State, writer: StreamWriter):
    writer({"custom_key": "Writing custom data while generating a joke"})
    return {"joke": f"This is a joke about {state['topic']}"}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="custom",
):
    print(chunk)
{'custom_key': 'Writing custom data while generating a joke'}

配置多种流模式

使用此功能可组合多个流式传输模式。输出将以元组 (stream_mode, streamed_output) 的形式进行流式传输。

for stream_mode, chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode=["updates", "custom"],
):
    print(f"Stream mode: {stream_mode}")
    print(chunk)
    print("\n")
Stream mode: updates
{'refine_topic': {'topic': 'ice cream and cats'}}


Stream mode: custom
{'custom_key': 'Writing custom data while generating a joke'}


Stream mode: updates
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

Comments