Skip to content

时间旅行

LangGraph 提供了 时间旅行 功能,用于 从之前的检查点恢复执行 —— 无论是重播相同的状态还是修改它以探索替代方案。在所有情况下,从过去恢复执行都会在历史中产生一个 新的分支

使用时间旅行功能

要在 LangGraph 中使用时间旅行功能:

  1. 使用初始输入运行图:使用 LangGraph SDKclient.runs.waitclient.runs.stream API。
  2. 识别现有线程中的检查点:使用 client.threads.get_history 方法,获取特定 thread_id 的执行历史并定位所需的 checkpoint_id
    或者,在你希望执行暂停的节点之前设置一个断点。然后你可以找到该断点前记录的最新检查点。
  3. (可选)修改图状态:使用 client.threads.update_state 方法,在检查点处修改图的状态,并从替代状态继续执行。
  4. 从检查点恢复执行:使用 client.runs.waitclient.runs.stream API,传入 None 作为输入以及适当的 thread_idcheckpoint_id 来恢复执行。

示例

示例图
from typing_extensions import TypedDict, NotRequired
from langgraph.graph import StateGraph, START, END
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaver

class State(TypedDict):
    topic: NotRequired[str]
    joke: NotRequired[str]

llm = init_chat_model(
    "anthropic:claude-3-7-sonnet-latest",
    temperature=0,
)

def generate_topic(state: State):
    """LLM调用以生成笑话的主题"""
    msg = llm.invoke("Give me a funny topic for a joke")
    return {"topic": msg.content}

def write_joke(state: State):
    """LLM调用以根据主题编写笑话"""
    msg = llm.invoke(f"Write a short joke about {state['topic']}")
    return {"joke": msg.content}

# 构建工作流
builder = StateGraph(State)

# 添加节点
builder.add_node("generate_topic", generate_topic)
builder.add_node("write_joke", write_joke)

# 添加边以连接节点
builder.add_edge(START, "generate_topic")
builder.add_edge("generate_topic", "write_joke")

# 编译
graph = builder.compile()

1. 运行图

from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用名为 "agent" 部署的图
assistant_id = "agent"

# 创建线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

# 运行图
result = await client.runs.wait(
    thread_id,
    assistant_id,
    input={}
)
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: <DEPLOYMENT_URL> });

// 使用名为 "agent" 部署的图
const assistantID = "agent";

// 创建线程
const thread = await client.threads.create();
const threadID = thread["thread_id"];

// 运行图
const result = await client.runs.wait(
  threadID,
  assistantID,
  { input: {}}
);

创建线程:

curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--data '{}'

运行图:

curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"input\": {}
}"

2. 识别检查点

# 状态按时间倒序返回。
states = await client.threads.get_history(thread_id)
selected_state = states[1]
print(selected_state)
// 状态按时间倒序返回。
const states = await client.threads.getHistory(threadID);
const selectedState = states[1];
console.log(selectedState);
curl --request GET \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/history \
--header 'Content-Type: application/json'

3. 更新状态(可选)

update_state 将创建一个新的检查点。新的检查点将与同一个线程相关联,但具有新的检查点 ID。

new_config = await client.threads.update_state(
    thread_id,
    {"topic": "chickens"},
    checkpoint_id=selected_state["checkpoint_id"]
)
print(new_config)
const newConfig = await client.threads.updateState(
  threadID,
  {
    values: { "topic": "chickens" },
    checkpointId: selectedState["checkpoint_id"]
  }
);
console.log(newConfig);
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/state \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"checkpoint_id\": <CHECKPOINT_ID>,
  \"values\": {\"topic\": \"chickens\"}
}"

4. 从检查点恢复执行

await client.runs.wait(
    thread_id,
    assistant_id,
    input=None,
    checkpoint_id=new_config["checkpoint_id"]
)
await client.runs.wait(
  threadID,
  assistantID,
  {
    input: null,
    checkpointId: newConfig["checkpoint_id"]
  }
);
curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/runs/wait \
--header 'Content-Type: application/json' \
--data "{
  \"assistant_id\": \"agent\",
  \"checkpoint_id\": <CHECKPOINT_ID>
}"

学习更多