Skip to content

可靠执行

**可靠执行**是一种技术,其中进程或工作流在关键点保存其进度,使其可以暂停并在之后从停止的地方精确恢复。这在需要人工介入的场景中特别有用,用户可以在继续之前检查、验证或修改进程,并且对于可能遇到中断或错误的长时间任务(例如调用LLM超时)也很有用。通过保留已完成的工作,可靠执行使进程能够在不重新处理先前步骤的情况下恢复——即使在发生重大延迟后(例如一周后)。

LangGraph内置的persistence层为工作流提供可靠执行,确保每个执行步骤的状态都保存到持久化存储中。这种能力保证了如果工作流被中断——无论是由于系统故障还是人工介入交互——都可以从最后一次记录的状态恢复。

Tip

如果你使用LangGraph并启用了checkpointer,那么你已经启用了可靠执行。你可以在任何时间点暂停和恢复工作流,即使在发生中断或故障后也是如此。 为了充分利用可靠执行,请确保你的工作流设计为确定性幂等性,并将任何副作用或非确定性操作封装在tasks中。你可以从StateGraph (Graph API)Functional API中使用tasks

要求

要在 LangGraph 中利用持久化执行,你需要:

  1. 通过指定一个 checkpointer 来在你的工作流中启用 persistence,该 checkpointer 将保存工作流的进度。
  2. 在执行工作流时指定一个 thread identifier。这将跟踪特定工作流实例的执行历史。
  3. 将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)封装在 tasks 中,以确保当工作流恢复时,这些操作不会在特定运行中重复执行,而是从持久化层检索其结果。有关更多信息,请参阅 Determinism and Consistent Replay

确定性与一致性重放

当你恢复一个工作流运行时,代码不会从**相同的代码行**继续执行;相反,它会找到一个合适的起始点,从该点继续之前停止的位置。这意味着工作流将从起始点重新播放所有步骤,直到达到停止的点。

因此,当你编写用于持久化执行的工作流时,必须将任何非确定性操作(例如随机数生成)和任何具有副作用的操作(例如文件写入、API 调用)封装在 任务节点 中。

为了确保你的工作流是确定性的,并且可以被一致地重放,请遵循以下准则:

  • 避免重复工作:如果 节点 包含多个具有副作用的操作(例如日志记录、文件写入或网络调用),请将每个操作分别封装在单独的 任务 中。这确保了当工作流恢复时,这些操作不会被重复执行,其结果将从持久化层中检索。
  • 封装非确定性操作:将可能产生非确定性结果的任何代码(例如随机数生成)封装在 任务节点 中。这确保了在恢复时,工作流将严格按照记录的步骤顺序执行并产生相同的结果。
  • 使用幂等操作:尽可能确保副作用(例如 API 调用、文件写入)是幂等的。这意味着如果在工作流失败后重试某个操作,其效果应与第一次执行时相同。这对于导致数据写入的操作尤为重要。如果某个 任务 开始但未能成功完成,工作流恢复时将重新运行该 任务,依赖记录的结果来保持一致性。使用幂等键或验证现有结果,以避免意外的重复,确保工作流执行的顺畅和可预测性。

有关需要避免的一些常见错误示例,请参阅功能 API 中的 常见陷阱 部分,其中展示了如何使用 任务 来构建代码以避免这些问题。同样的原则也适用于 StateGraph (图 API)

在节点中使用任务

如果一个 节点 包含多个操作,你可能会发现将每个操作转换为 任务 比将其重构为单独的节点更容易。

from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个TypedDict来表示状态
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """执行API请求的示例节点。"""
    result = requests.get(state['url']).text[:100]  # 副作用
    return {
        "result": result
    }

# 创建StateGraph构建器并添加call_api函数的节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到call_api节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定检查点
checkpointer = MemorySaver()

# 使用检查点编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义带有线程ID的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个TypedDict来表示状态
class State(TypedDict):
    urls: list[str]
    result: NotRequired[list[str]]


@task
def _make_request(url: str):
    """执行请求。"""
    return requests.get(url).text[:100]

def call_api(state: State):
    """执行API请求的示例节点。"""
    requests = [_make_request(url) for url in state['urls']]
    results = [request.result() for request in requests]
    return {
        "results": results
    }

# 创建StateGraph构建器并添加call_api函数的节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到call_api节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定检查点
checkpointer = MemorySaver()

# 使用检查点编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义带有线程ID的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"urls": ["https://www.example.com"]}, config)

恢复工作流

一旦在工作流中启用了持久化执行,您可以恢复以下场景中的执行:

  • 暂停和恢复工作流: 使用 [interrupt][langgraph.types.interrupt] 函数在特定点暂停工作流,并使用 [Command][langgraph.types.Command] 原语通过更新的状态来恢复它。有关更多细节,请参阅 人工介入循环
  • 从故障中恢复: 在发生异常(例如 LLM 提供商中断)后,自动从上一次成功的检查点恢复工作流。这需要通过提供一个 None 作为输入值,用相同的线程标识符执行工作流(请参见使用函数式 API 的这个 示例)。

恢复工作流的起点

  • 如果你使用的是 StateGraph (Graph API),起点是执行停止的 节点 的开始。
  • 如果你在某个节点中调用了一个子图,起点将是调用被暂停的子图的 节点。 在子图内部,起点将是执行停止的特定 节点
  • 如果你使用的是 Functional API,起点是执行停止的 entrypoint 的开始。