Skip to content

如何为子图添加线程级持久性

前提条件

本指南假设你熟悉以下内容:

本指南展示了如何为使用子图的图添加线程级持久化功能。

安装设置

首先,让我们安装所需的软件包。

%%capture --no-stderr
%pip install -U langgraph

为 LangGraph 开发设置 LangSmith

注册 LangSmith 以快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的大语言模型应用程序 — 点击 此处 了解更多关于如何开始使用的信息。

定义具有持久化功能的图

要为包含子图的图添加持久化功能,你只需在**编译父图时**传入一个检查点保存器。LangGraph 会自动将该检查点保存器传播到子图。

Note

在编译子图时,**不应提供**检查点保存器。相反,你必须定义一个**单一**的检查点保存器,并将其传递给 parent_graph.compile(),LangGraph 会自动将该检查点保存器传播到子图。如果你将检查点保存器传递给 subgraph.compile(),它将被直接忽略。这同样适用于你添加调用子图的节点函数的情况。

让我们定义一个包含单个子图节点的简单图,以展示如何进行此操作。

from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict


# subgraph


class SubgraphState(TypedDict):
    foo: str  # note that this key is shared with the parent graph state
    bar: str


def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}


def subgraph_node_2(state: SubgraphState):
    # note that this node is using a state key ('bar') that is only available in the subgraph
    # and is sending update on the shared state key ('foo')
    return {"foo": state["foo"] + state["bar"]}


subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()


# parent graph


class State(TypedDict):
    foo: str


def node_1(state: State):
    return {"foo": "hi! " + state["foo"]}


builder = StateGraph(State)
builder.add_node("node_1", node_1)
# note that we're adding the compiled subgraph as a node to the parent graph
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
<langgraph.graph.state.StateGraph at 0x106d2fa10>

现在,我们可以使用内存检查点保存器(MemorySaver)来编译计算图。

checkpointer = MemorySaver()
# You must only pass checkpointer when compiling the parent graph.
# LangGraph will automatically propagate the checkpointer to the child subgraphs.
graph = builder.compile(checkpointer=checkpointer)

验证持久化功能是否正常工作

现在,让我们运行图并检查父图和子图的持久化状态,以验证持久化是否正常工作。我们应该能在 state.values 中看到父图和子图的最终执行结果。

config = {"configurable": {"thread_id": "1"}}

for _, chunk in graph.stream({"foo": "foo"}, config, subgraphs=True):
    print(chunk)
{'node_1': {'foo': 'hi! foo'}}
{'subgraph_node_1': {'bar': 'bar'}}
{'subgraph_node_2': {'foo': 'hi! foobar'}}
{'node_2': {'foo': 'hi! foobar'}}
现在,我们可以通过使用调用图时所用的相同配置调用 graph.get_state() 来查看父图的状态。

graph.get_state(config).values
{'foo': 'hi! foobar'}

要查看子图状态,我们需要做两件事:

  1. 找到子图的最新配置值
  2. 使用 graph.get_state() 来检索最新子图配置的该值。

要找到正确的配置,我们可以检查父图的状态历史记录,并在从 node_2(包含子图的节点)返回结果之前找到状态快照:

state_with_subgraph = [
    s for s in graph.get_state_history(config) if s.next == ("node_2",)
][0]

状态快照将包含接下来要执行的 tasks 列表。使用子图时,tasks 将包含可用于检索子图状态的配置:

subgraph_config = state_with_subgraph.tasks[0].state
subgraph_config
{'configurable': {'thread_id': '1',
  'checkpoint_ns': 'node_2:6ef111a6-f290-7376-0dfc-a4152307bc5b'}}
graph.get_state(subgraph_config).values
{'foo': 'hi! foobar', 'bar': 'bar'}

如果你想了解更多关于如何为人工介入的工作流修改子图状态的信息,请查看这篇操作指南

Comments