Skip to content

如何进行流媒体播放

先决条件

本指南假设您熟悉以下内容:

流处理对于增强基于LLM的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流处理也显著改善了用户体验(UX),特别是在处理LLM延迟时。

LangGraph内置了对流处理的一流支持。有几种不同的方法可以从图运行中流式传输输出:

  • "values":在每一步后发出状态中的所有值。
  • "updates":仅在每一步后发出节点名称和由节点返回的更新。 如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将分别发出。
  • "custom":使用StreamWriter从节点内部发出自定义数据。
  • "messages":按令牌分发LLM消息,并与节点内部任何LLM调用的元数据一起发出。
  • "debug":在每一步中发出尽可能详细的信息的调试事件。

您可以使用graph.stream(..., stream_mode=<stream_mode>)方法从图中流式传输输出,例如:

for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
async for chunk in graph.astream(inputs, stream_mode="updates"):
    print(chunk)

您还可以通过提供stream_mode参数的列表来组合多种流模式:

for chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)
async for chunk in graph.astream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

设置环境

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
OPENAI_API_KEY:  ········

为LangGraph开发设置LangSmith

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

让我们定义一个包含两个节点的简单图:

定义图

from typing import TypedDict
from langgraph.graph import StateGraph, START


class State(TypedDict):
    topic: str
    joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

API Reference: StateGraph | START

流式处理状态中的所有值 (stream_mode="values")

用于在每一步之后流式传输状态中的**所有值**。

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="values",
):
    print(chunk)
{'topic': 'ice cream'}
{'topic': 'ice cream and cats'}
{'topic': 'ice cream and cats', 'joke': 'This is a joke about ice cream and cats'}

从节点流式传输状态更新(stream_mode="updates")

使用此功能仅流式传输节点在每一步之后返回的**状态更新**。流式传输的输出包括节点的名称以及更新内容。

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="updates",
):
    print(chunk)
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

流式调试事件(stream_mode="debug")

使用此功能以尽可能详细的信息流式传输**调试事件**,包括有关计划执行的任务的信息以及任务执行的结果。

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="debug",
):
    print(chunk)
{'type': 'task', 'timestamp': '2025-01-28T22:06:34.789803+00:00', 'step': 1, 'payload': {'id': 'eb305d74-3460-9510-d516-beed71a63414', 'name': 'refine_topic', 'input': {'topic': 'ice cream'}, 'triggers': ['start:refine_topic']}}
{'type': 'task_result', 'timestamp': '2025-01-28T22:06:34.790013+00:00', 'step': 1, 'payload': {'id': 'eb305d74-3460-9510-d516-beed71a63414', 'name': 'refine_topic', 'error': None, 'result': [('topic', 'ice cream and cats')], 'interrupts': []}}
{'type': 'task', 'timestamp': '2025-01-28T22:06:34.790165+00:00', 'step': 2, 'payload': {'id': '74355cb8-6284-25e0-579f-430493c1bdab', 'name': 'generate_joke', 'input': {'topic': 'ice cream and cats'}, 'triggers': ['refine_topic']}}
{'type': 'task_result', 'timestamp': '2025-01-28T22:06:34.790337+00:00', 'step': 2, 'payload': {'id': '74355cb8-6284-25e0-579f-430493c1bdab', 'name': 'generate_joke', 'error': None, 'result': [('joke', 'This is a joke about ice cream and cats')], 'interrupts': []}}

流式传输LLM标记 (stream_mode="messages")

使用此功能可以流式传输**LLM消息 token-by-token**,并为节点或任务内的任何LLM调用提供元数据。让我们修改上面的示例以包含LLM调用:

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")


def generate_joke(state: State):
    llm_response = llm.invoke(
        [
            {"role": "user", "content": f"Generate a joke about {state['topic']}"}
        ]
    )
    return {"joke": llm_response.content}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

API Reference: ChatOpenAI

for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)
Why| did| the| cat| sit| on| the| ice| cream| cone|?

|Because| it| wanted| to| be| a| "|p|urr|-f|ect|"| scoop|!| 🍦|🐱|

metadata
{'langgraph_step': 2,
 'langgraph_node': 'generate_joke',
 'langgraph_triggers': ['refine_topic'],
 'langgraph_path': ('__pregel_pull', 'generate_joke'),
 'langgraph_checkpoint_ns': 'generate_joke:568879bc-8800-2b0d-a5b5-059526a4bebf',
 'checkpoint_ns': 'generate_joke:568879bc-8800-2b0d-a5b5-059526a4bebf',
 'ls_provider': 'openai',
 'ls_model_name': 'gpt-4o-mini',
 'ls_model_type': 'chat',
 'ls_temperature': 0.7}

流式自定义数据(stream_mode="custom")

使用此方法可以通过节点内部使用StreamWriter流式传输自定义数据。

from langgraph.types import StreamWriter


def generate_joke(state: State, writer: StreamWriter):
    writer({"custom_key": "Writing custom data while generating a joke"})
    return {"joke": f"This is a joke about {state['topic']}"}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

API Reference: StreamWriter

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="custom",
):
    print(chunk)
{'custom_key': 'Writing custom data while generating a joke'}

配置多种流式模式

用于结合多种流模式。输出将以元组 (stream_mode, streamed_output) 的形式进行流式传输。

for stream_mode, chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode=["updates", "custom"],
):
    print(f"Stream mode: {stream_mode}")
    print(chunk)
    print("\n")
Stream mode: updates
{'refine_topic': {'topic': 'ice cream and cats'}}


Stream mode: custom
{'custom_key': 'Writing custom data while generating a joke'}


Stream mode: updates
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

Comments