如何进行流媒体播放¶
流处理对于增强基于LLM的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流处理也显著改善了用户体验(UX),特别是在处理LLM延迟时。
LangGraph内置了对流处理的一流支持。有几种不同的方法可以从图运行中流式传输输出:
"values"
:在每一步后发出状态中的所有值。"updates"
:仅在每一步后发出节点名称和由节点返回的更新。 如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将分别发出。"custom"
:使用StreamWriter
从节点内部发出自定义数据。"messages"
:按令牌分发LLM消息,并与节点内部任何LLM调用的元数据一起发出。"debug"
:在每一步中发出尽可能详细的信息的调试事件。
您可以使用graph.stream(..., stream_mode=<stream_mode>)
方法从图中流式传输输出,例如:
您还可以通过提供stream_mode
参数的列表来组合多种流模式:
设置环境¶
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
为LangGraph开发设置LangSmith
注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里。
让我们定义一个包含两个节点的简单图:
定义图¶
from typing import TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.compile()
)
API Reference: StateGraph | START
流式处理状态中的所有值 (stream_mode="values")¶
用于在每一步之后流式传输状态中的**所有值**。
{'topic': 'ice cream'}
{'topic': 'ice cream and cats'}
{'topic': 'ice cream and cats', 'joke': 'This is a joke about ice cream and cats'}
从节点流式传输状态更新(stream_mode="updates")¶
使用此功能仅流式传输节点在每一步之后返回的**状态更新**。流式传输的输出包括节点的名称以及更新内容。
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}
流式调试事件(stream_mode="debug")¶
使用此功能以尽可能详细的信息流式传输**调试事件**,包括有关计划执行的任务的信息以及任务执行的结果。
{'type': 'task', 'timestamp': '2025-01-28T22:06:34.789803+00:00', 'step': 1, 'payload': {'id': 'eb305d74-3460-9510-d516-beed71a63414', 'name': 'refine_topic', 'input': {'topic': 'ice cream'}, 'triggers': ['start:refine_topic']}}
{'type': 'task_result', 'timestamp': '2025-01-28T22:06:34.790013+00:00', 'step': 1, 'payload': {'id': 'eb305d74-3460-9510-d516-beed71a63414', 'name': 'refine_topic', 'error': None, 'result': [('topic', 'ice cream and cats')], 'interrupts': []}}
{'type': 'task', 'timestamp': '2025-01-28T22:06:34.790165+00:00', 'step': 2, 'payload': {'id': '74355cb8-6284-25e0-579f-430493c1bdab', 'name': 'generate_joke', 'input': {'topic': 'ice cream and cats'}, 'triggers': ['refine_topic']}}
{'type': 'task_result', 'timestamp': '2025-01-28T22:06:34.790337+00:00', 'step': 2, 'payload': {'id': '74355cb8-6284-25e0-579f-430493c1bdab', 'name': 'generate_joke', 'error': None, 'result': [('joke', 'This is a joke about ice cream and cats')], 'interrupts': []}}
流式传输LLM标记 (stream_mode="messages")¶
使用此功能可以流式传输**LLM消息 token-by-token**,并为节点或任务内的任何LLM调用提供元数据。让我们修改上面的示例以包含LLM调用:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
def generate_joke(state: State):
llm_response = llm.invoke(
[
{"role": "user", "content": f"Generate a joke about {state['topic']}"}
]
)
return {"joke": llm_response.content}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.compile()
)
API Reference: ChatOpenAI
for message_chunk, metadata in graph.stream(
{"topic": "ice cream"},
stream_mode="messages",
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
Why| did| the| cat| sit| on| the| ice| cream| cone|?
|Because| it| wanted| to| be| a| "|p|urr|-f|ect|"| scoop|!| 🍦|🐱|
{'langgraph_step': 2,
'langgraph_node': 'generate_joke',
'langgraph_triggers': ['refine_topic'],
'langgraph_path': ('__pregel_pull', 'generate_joke'),
'langgraph_checkpoint_ns': 'generate_joke:568879bc-8800-2b0d-a5b5-059526a4bebf',
'checkpoint_ns': 'generate_joke:568879bc-8800-2b0d-a5b5-059526a4bebf',
'ls_provider': 'openai',
'ls_model_name': 'gpt-4o-mini',
'ls_model_type': 'chat',
'ls_temperature': 0.7}
流式自定义数据(stream_mode="custom")¶
使用此方法可以通过节点内部使用StreamWriter
流式传输自定义数据。
from langgraph.types import StreamWriter
def generate_joke(state: State, writer: StreamWriter):
writer({"custom_key": "Writing custom data while generating a joke"})
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.compile()
)
API Reference: StreamWriter
配置多种流式模式¶
用于结合多种流模式。输出将以元组 (stream_mode, streamed_output)
的形式进行流式传输。
for stream_mode, chunk in graph.stream(
{"topic": "ice cream"},
stream_mode=["updates", "custom"],
):
print(f"Stream mode: {stream_mode}")
print(chunk)
print("\n")
Stream mode: updates
{'refine_topic': {'topic': 'ice cream and cats'}}
Stream mode: custom
{'custom_key': 'Writing custom data while generating a joke'}
Stream mode: updates
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}