Skip to content

持久化执行

持久化执行 是一种技术,在该技术中,一个进程或工作流会在关键点保存其进度,从而允许它暂停,并且之后能从暂停处精确恢复执行。这在需要 人工介入 的场景中特别有用,在这些场景里,用户可以在继续执行之前检查、验证或修改流程;在可能遇到中断或错误的长时间运行任务(例如,对大语言模型的调用超时)中也很有用。通过保留已完成的工作,持久化执行使进程能够恢复执行,而无需重新处理之前的步骤 —— 即使经过了很长的延迟(例如,一周之后)。

LangGraph 内置的 持久化 层为工作流提供持久化执行功能,确保每个执行步骤的状态都保存到持久存储中。此功能保证了如果工作流被中断 —— 无论是由于系统故障还是 人工介入 交互 —— 都可以从最后记录的状态恢复执行。

Tip

如果你在使用 LangGraph 时搭配了检查点器,那么你已经启用了持久化执行功能。你可以在任何时候暂停和恢复工作流,即使在出现中断或故障之后也可以。 为了充分利用持久化执行功能,请确保你的工作流设计为 确定性的幂等的,并将任何副作用或非确定性操作封装在 任务 中。你可以使用 状态图(图 API)函数式 API 中的 任务

要求

要在 LangGraph 中利用持久化执行,你需要:

  1. 通过指定一个 检查点器 来在你的工作流中启用 持久化,该检查点器将保存工作流的进度。
  2. 在执行工作流时指定一个 线程标识符。这将跟踪工作流特定实例的执行历史。
  3. 将任何非确定性操作(例如,随机数生成)或有副作用的操作(例如,文件写入、API 调用)包装在 任务 中,以确保在恢复工作流时,这些操作不会针对特定运行重复执行,而是从持久化层检索它们的结果。有关更多信息,请参阅 确定性和一致重放

确定性与一致重放

当你恢复一个工作流运行时,代码**不会**从执行停止的**同一行代码**处继续执行;相反,它会识别一个合适的起始点,从该点继续之前中断的执行。这意味着工作流将从起始点开始重放所有步骤,直到到达它停止的位置。

因此,当你编写用于持久执行的工作流时,必须将任何非确定性操作(例如,随机数生成)和任何有副作用的操作(例如,文件写入、API 调用)封装在任务节点中。

为确保你的工作流具有确定性并能一致重放,请遵循以下准则:

  • 避免重复工作:如果一个节点包含多个有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作封装在一个单独的**任务**中。这样可以确保在恢复工作流时,这些操作不会重复执行,并且它们的结果会从持久化层中获取。
  • 封装非确定性操作:将任何可能产生非确定性结果的代码(例如,随机数生成)封装在**任务**或**节点**中。这样可以确保在恢复工作流时,它会按照记录的步骤序列精确执行,并产生相同的结果。
  • 使用幂等操作:尽可能确保副作用操作(例如,API 调用、文件写入)是幂等的。这意味着如果工作流中的某个操作失败后重试,它产生的效果与第一次执行时相同。这对于导致数据写入的操作尤为重要。如果一个**任务**开始但未能成功完成,工作流恢复时会重新运行该**任务**,并依靠记录的结果来保持一致性。使用幂等性键或验证现有结果,以避免意外的重复操作,确保工作流执行顺畅且可预测。

有关一些需要避免的陷阱示例,请参阅函数式 API 中的常见陷阱部分,该部分展示了如何使用**任务**来组织代码以避免这些问题。相同的原则也适用于状态图(图 API)

在节点中使用任务

如果一个节点包含多个操作,你可能会发现将每个操作转换为一个**任务**,而不是将这些操作重构为单独的节点,会更加容易。

from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个 TypedDict 来表示状态
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """进行 API 请求的示例节点。"""
    result = requests.get(state['url']).text[:100]  # 副作用
    return {
        "result": result
    }

# 创建一个 StateGraph 构建器,并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将起始节点和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定一个检查点保存器
checkpointer = MemorySaver()

# 使用检查点保存器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义一个包含线程 ID 的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个 TypedDict 来表示状态
class State(TypedDict):
    urls: list[str]
    result: NotRequired[list[str]]


@task
def _make_request(url: str):
    """发起请求。"""
    return requests.get(url).text[:100]

def call_api(state: State):
    """进行 API 请求的示例节点。"""
    requests = [_make_request(url) for url in state['urls']]
    results = [request.result() for request in requests]
    return {
        "results": results
    }

# 创建一个 StateGraph 构建器,并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将起始节点和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定一个检查点保存器
checkpointer = MemorySaver()

# 使用检查点保存器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义一个包含线程 ID 的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"urls": ["https://www.example.com"]}, config)

恢复工作流

在工作流中启用持久执行后,您可以在以下场景中恢复执行:

  • 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新后的状态恢复工作流。有关更多详细信息,请参阅 人工介入
  • 从故障中恢复: 在发生异常(例如,大语言模型(LLM)提供商服务中断)后,从最后一个成功的检查点自动恢复工作流。这需要通过提供 None 作为输入值,使用相同的线程标识符来执行工作流(请参阅使用函数式 API 的此 示例)。

恢复工作流的起始点

  • 如果你正在使用 StateGraph(图 API),起始点是执行停止处的 节点 的起始位置。
  • 如果你在一个节点内进行子图调用,起始点将是调用了被暂停子图的 节点。 在子图内部,起始点将是执行停止处的特定 节点
  • 如果你正在使用函数式 API,起始点是执行停止处的 入口点 的起始位置。

Comments