流式传输¶
流式传输对于使LLM应用对终端用户感觉响应迅速至关重要。
在创建流式运行时,**流式模式**决定了哪些类型的数据会流式传输回API客户端。
支持的流式模式¶
LangGraph 平台支持以下流式模式:
模式 | 描述 | LangGraph 库方法 |
---|---|---|
values |
在每个超级步骤之后流式传输完整的图状态。指南 | .stream() / .astream() 带 stream_mode="values" |
updates |
在每个节点之后仅流式传输对图状态的更新。指南 | .stream() / .astream() 带 stream_mode="updates" |
messages-tuple |
流式传输图中生成的任何消息的LLM令牌(适用于聊天应用)。指南 | .stream() / .astream() 带 stream_mode="messages" |
debug |
在图执行过程中流式传输调试信息。指南 | .stream() / .astream() 带 stream_mode="debug" |
custom |
流式传输自定义数据。指南 | .stream() / .astream() 带 stream_mode="custom" |
events |
流式传输所有事件(包括图的状态);主要在迁移大型LCEL应用时有用。指南 | .astream_events() |
✅ 你也可以同时**组合使用多种模式**。有关配置细节,请参阅如何操作指南。
无状态运行¶
如果你不希望将流式运行的**输出持久化**到 checkpointer 数据库中,你可以创建一个无状态的运行,而无需创建线程:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
async for chunk in client.runs.stream(
None, # (1)!
assistant_id,
input=inputs,
stream_mode="updates"
):
print(chunk.data)
- 我们传递的是
None
而不是thread_id
的 UUID。
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL>, apiKey: <API_KEY> });
// 创建流式运行
const streamResponse = client.runs.stream(
null, // (1)!
assistantID,
{
input,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console.log(chunk.data);
}
- 我们传递的是
None
而不是thread_id
的 UUID。
加入并流式传输¶
LangGraph 平台允许你加入一个正在进行的后台运行并流式传输其输出。为此,你可以使用 LangGraph SDK 的 client.runs.join_stream
方法:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
async for chunk in client.runs.join_stream(
thread_id,
run_id, # (1)!
):
print(chunk)
- 这是你想要加入的现有运行的
run_id
。
输出未缓冲
当你使用 .join_stream
时,输出不会被缓冲,因此在加入之前生成的任何输出都不会被接收。
API 参考¶
有关 API 的使用和实现,请参考 API 参考。