持久化¶
LangGraph 内置了持久化层,通过检查点(checkpointer)实现。当你使用检查点编译图时,检查点会在每个超级步骤中保存图状态的 checkpoint
。这些检查点被保存到一个 thread
中,在图执行后可以访问。由于 threads
允许在执行后访问图的状态,因此可以实现多种强大的功能,包括人机协作、记忆、时间旅行和容错。有关如何添加和使用检查点的端到端示例,请参阅 此操作指南。下面,我们将更详细地讨论这些概念。
LangGraph API 自动处理检查点
使用 LangGraph API 时,你不需要手动实现或配置检查点。API 在后台自动处理所有持久化基础设施。
线程¶
线程是每个检查点保存器分配给每个检查点的唯一 ID 或 线程标识符。当使用带有检查点器的图进行调用时,你**必须**在 config 的 configurable
部分中指定一个 thread_id
:
Checkpoints¶
检查点是每个超级步骤中图状态的快照,由具有以下关键属性的 StateSnapshot
对象表示:
config
: 与此检查点相关联的配置。metadata
: 与此检查点相关联的元数据。values
: 此时刻状态通道的值。next
: 在图中接下来要执行的节点名称的元组。tasks
: 包含有关下一步任务信息的PregelTask
对象的元组。如果该步骤之前尝试过,它将包含错误信息。如果从节点内部动态地中断了图,则任务将包含与中断相关的附加数据。
让我们看看当以如下方式调用一个简单图时会保存哪些检查点:
API Reference: StateGraph | START | END | InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: str
bar: Annotated[list[str], add]
def node_a(state: State):
return {"foo": "a", "bar": ["a"]}
def node_b(state: State):
return {"foo": "b", "bar": ["b"]}
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
在我们运行图后,我们期望看到正好4个检查点:
- 空检查点,下一个要执行的节点为
START
- 检查点带有用户输入
{'foo': '', 'bar': []}
,下一个要执行的节点为node_a
- 检查点带有
node_a
的输出{'foo': 'a', 'bar': ['a']}
,下一个要执行的节点为node_b
- 检查点带有
node_b
的输出{'foo': 'b', 'bar': ['a', 'b']}
,没有下一个要执行的节点
请注意,由于我们为 bar
通道指定了一个 reducer,因此 bar
通道的值包含了两个节点的输出。
获取状态¶
在与保存的图状态进行交互时,你**必须**指定一个 线程标识符。你可以通过调用 graph.get_state(config)
来查看图的*最新*状态。这将返回一个 StateSnapshot
对象,对应于配置中提供的线程 ID 相关联的最新检查点,或者如果提供了检查点 ID,则对应于线程的特定检查点。
# 获取最新的状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)
# 获取特定 checkpoint_id 的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state
的输出将如下所示:
StateSnapshot(
values={'foo': 'b', 'bar': ['a', 'b']},
next=(),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
created_at='2024-08-29T19:19:38.821749+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)
获取状态历史¶
你可以通过调用 graph.get_state_history(config)
来获取给定线程的图执行完整历史。这将返回与配置中提供的线程 ID 相关联的一系列 StateSnapshot
对象。重要的是,这些检查点将按时间顺序排列,最近的检查点 / StateSnapshot
将是列表中的第一个。
在我们的示例中,get_state_history
的输出将如下所示:
[
StateSnapshot(
values={'foo': 'b', 'bar': ['a', 'b']},
next=(),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
created_at='2024-08-29T19:19:38.821749+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
tasks=(),
),
StateSnapshot(
values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
created_at='2024-08-29T19:19:38.819946+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
),
StateSnapshot(
values={'foo': '', 'bar': []},
next=('node_a',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
created_at='2024-08-29T19:19:38.819946+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
),
StateSnapshot(
values={'foo': '', 'bar': []},
next=('node_a',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
metadata={'source': 'loop', 'writes': None, 'step': 0},
created_at='2024-08-29T19:19:38.817813+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
),
StateSnapshot(
values={'bar': []},
next=('__start__',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
created_at='2024-08-29T19:19:38.816205+00:00',
parent_config=None,
tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
)
]
重放¶
还可以回放先前的图执行。如果我们使用 thread_id
和 checkpoint_id
调用图的 invoke
,那么我们将 重放 在 checkpoint_id
对应的检查点之前的已执行步骤,并且只执行检查点之后的步骤。
thread_id
是线程的 ID。checkpoint_id
是指向线程内特定检查点的标识符。
在调用图时,必须作为配置中 configurable
部分的一部分传递这些参数:
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
重要的是,LangGraph 会知道某个特定步骤是否已经执行过。如果已经执行过,LangGraph 只是 重放 图中的该特定步骤,而不会重新执行该步骤,但仅限于在提供的 checkpoint_id
之前的步骤。所有在 checkpoint_id
之后的步骤都会被执行(即,新的分支),即使它们之前已经被执行过。了解更多关于重放的信息,请参阅此 如何操作指南:时间旅行。
更新状态¶
除了从特定 checkpoints
重新播放图之外,我们还可以 编辑 图的状态。我们使用 graph.update_state()
来实现这一点。这个方法接受三个不同的参数:
config
¶
配置应包含指定要更新的线程的 thread_id
。当只传递 thread_id
时,我们会更新(或分叉)当前状态。可选地,如果我们包括 checkpoint_id
字段,那么我们将分叉所选择的检查点。
values
¶
这些是要用来更新状态的值。请注意,这种更新被视为任何节点更新的方式一样。这意味着这些值将被传递到 reducer 函数,如果某些通道在图状态中有定义的 reducer。这意味着 update_state
不会自动覆盖每个通道的通道值,而是只覆盖那些没有 reducer 的通道。让我们通过一个例子来说明这一点。
假设你用以下模式定义了你的图状态(请参见上面的完整示例):
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
现在假设图的当前状态为:
如果你更新状态如下:
那么图的新状态将是:
foo
键(通道)被完全更改(因为没有为该通道指定 reducer,所以 update_state
覆盖了它)。然而,bar
键有一个指定的 reducer,因此它会将 "b"
追加到 bar
的状态中。
as_node
¶
当你调用 update_state
时,最后可以可选地指定 as_node
。如果你提供了它,更新将被视为来自节点 as_node
。如果没有提供 as_node
,则将其设置为最后一个更新状态的节点(如果无歧义的话)。这很重要,因为下一步要执行的步骤取决于最后一个给出更新的节点,因此可以用来控制哪个节点接下来执行。了解更多关于分叉状态的信息,请参阅此 如何操作指南:时间旅行。
内存存储¶
一个state schema指定了在图执行过程中被填充的一组键。如上所述,检查点器可以在每个图步骤中将状态写入线程,从而实现状态的持久化。
但是,如果我们想要跨线程保留一些信息怎么办?考虑这样一个聊天机器人的情况:我们希望在整个用户的所有聊天对话(例如线程)中保留有关该用户的特定信息!
仅使用检查点器,我们无法在线程之间共享信息。这促使了 Store
接口的出现。作为示例,我们可以定义一个 InMemoryStore
来跨线程存储关于用户的信息。我们只需像以前一样用检查点器编译我们的图,并且使用新的 in_memory_store
变量。
LangGraph API 自动处理存储
使用 LangGraph API 时,你不需要手动实现或配置存储。API 在后台为你处理所有存储基础设施。
基本用法¶
首先,让我们不使用 LangGraph 单独展示这一点。
内存是通过一个 tuple
进行命名空间划分的,在这个具体示例中,它将是 (<user_id>, "memories")
。命名空间可以是任意长度,并表示任何内容,不一定是用户特定的。
我们使用 store.put
方法将内存保存到存储中的命名空间。当我们这样做时,我们需要指定上述定义的命名空间,以及一个用于内存的键值对:键只是一个唯一的标识符(memory_id
),而值(一个字典)就是内存本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search
方法从我们的命名空间中读取内存,这将返回给定用户的全部内存作为一个列表。最近的内存位于列表的最后。
memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
'namespace': ['1', 'memories'],
'created_at': '2024-10-02T17:22:31.590602+00:00',
'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种类型的内存都是一个 Python 类 (Item
),具有某些属性。我们可以通过 .dict
将其转换为字典来访问它,如上所示。
它具有的属性包括:
value
: 该内存的值(本身是一个字典)key
: 该命名空间中此内存的唯一键namespace
: 字符串列表,该内存类型的命名空间created_at
: 此内存创建的时间戳updated_at
: 此内存更新的时间戳
语义搜索¶
除了简单的检索,存储还支持语义搜索,允许您根据含义而不是精确匹配查找记忆。为此,请使用嵌入模型配置存储:
API Reference: init_embeddings
from langchain.embeddings import init_embeddings
store = InMemoryStore(
index={
"embed": init_embeddings("openai:text-embedding-3-small"), # 嵌入提供者
"dims": 1536, # 嵌入维度
"fields": ["food_preference", "$"] # 要嵌入的字段
}
)
现在在进行搜索时,您可以使用自然语言查询找到相关记忆:
# 查找关于食物偏好的记忆
# (这可以在将记忆放入存储后进行)
memories = store.search(
namespace_for_memory,
query="What does the user like to eat?",
limit=3 # 返回前3个匹配项
)
您可以通过配置 fields
参数或在存储记忆时指定 index
参数来控制哪些部分的记忆会被嵌入:
# 存储带有特定字段以进行嵌入
store.put(
namespace_for_memory,
str(uuid.uuid4()),
{
"food_preference": "I love Italian cuisine",
"context": "Discussing dinner plans"
},
index=["food_preference"] # 仅嵌入 "food_preferences" 字段
)
# 存储时不进行嵌入(仍然可检索,但不可搜索)
store.put(
namespace_for_memory,
str(uuid.uuid4()),
{"system_info": "Last updated: 2024-01-01"},
index=False
)
在 LangGraph 中使用¶
有了这些准备,我们在 LangGraph 中使用 in_memory_store
。in_memory_store
与检查点器协同工作:如上所述,检查点器将状态保存到线程,而 in_memory_store
允许我们存储任意信息以便在不同线程之间访问。我们如下所示地将图与检查点器和 in_memory_store
编译在一起。
API Reference: InMemorySaver
from langgraph.checkpoint.memory import InMemorySaver
# 我们需要这个,因为我们想启用线程(对话)
checkpointer = InMemorySaver()
# ... 定义图 ...
# 使用检查点器和存储编译图
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
我们像以前一样使用 thread_id
调用图,并且也使用 user_id
,我们将用它来命名空间化我们的记忆,就像我们上面所展示的那样。
# 调用图
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}
# 首先,我们向AI打招呼
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
print(update)
我们可以在*任何节点*中通过传递 store: BaseStore
和 config: RunnableConfig
作为节点参数来访问 in_memory_store
和 user_id
。以下是我们在节点中使用语义搜索的方法:
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]
# 对记忆进行命名空间划分
namespace = (user_id, "memories")
# ... 分析对话并创建新记忆
# 创建一个新的记忆ID
memory_id = str(uuid.uuid4())
# 我们创建一个新的记忆
store.put(namespace, memory_id, {"memory": memory})
正如我们上面所示,我们也可以在任何节点中访问存储,并使用 store.search
方法获取记忆。记住,记忆是以对象列表的形式返回的,这些对象可以转换为字典。
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
'namespace': ['1', 'memories'],
'created_at': '2024-10-02T17:22:31.590602+00:00',
'updated_at': '2024-10-02T17:22:31.590605+00:00'}
我们可以访问这些记忆并在模型调用中使用它们。
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
# 从配置中获取用户ID
user_id = config["configurable"]["user_id"]
# 对记忆进行命名空间划分
namespace = (user_id, "memories")
# 根据最新的消息进行搜索
memories = store.search(
namespace,
query=state["messages"][-1].content,
limit=3
)
info = "\n".join([d.value["memory"] for d in memories])
# ... 在模型调用中使用记忆
如果我们创建了一个新的线程,只要 user_id
相同,我们仍然可以访问相同的记忆。
# 调用图
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
# 再次打招呼
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
print(update)
当我们使用 LangGraph 平台时,无论是本地(例如在 LangGraph Studio 中)还是使用 LangGraph 平台,基础存储默认可用,无需在图编译期间指定。但是,要启用语义搜索,你需要在你的 langgraph.json
文件中配置索引设置。例如:
{
...
"store": {
"index": {
"embed": "openai:text-embeddings-3-small",
"dims": 1536,
"fields": ["$"]
}
}
}
有关更多详细信息和配置选项,请参阅 部署指南。
Checkpointer 库¶
在幕后,检查点功能由符合 BaseCheckpointSaver 接口的 checkpointer 对象实现。LangGraph 提供了多种 checkpointer 实现,所有实现都通过独立、可安装的库进行:
langgraph-checkpoint
: 用于 checkpointer 存储器的基本接口 (BaseCheckpointSaver) 和序列化/反序列化接口 (SerializerProtocol)。包含内存中 checkpointer 实现 (InMemorySaver),适用于实验。LangGraph 默认包含langgraph-checkpoint
。langgraph-checkpoint-sqlite
: 使用 SQLite 数据库的 LangGraph checkpointer 实现 (SqliteSaver / AsyncSqliteSaver)。适合实验和本地工作流。需要单独安装。langgraph-checkpoint-postgres
: 使用 Postgres 数据库的高级 checkpointer (PostgresSaver / AsyncPostgresSaver),用于 LangGraph 平台。适合生产环境使用。需要单独安装。
Checkpointer 接口¶
每个 checkpointer 都符合 BaseCheckpointSaver 接口,并实现以下方法:
.put
- 带有配置和元数据存储检查点。.put_writes
- 存储与检查点相关联的中间写入(即待处理写入)。.get_tuple
- 使用给定的配置 (thread_id
和checkpoint_id
) 获取检查点元组。这用于填充graph.get_state()
中的StateSnapshot
。.list
- 列出匹配给定配置和过滤条件的检查点。这用于填充graph.get_state_history()
中的状态历史记录。
如果使用异步图执行(即通过 .ainvoke
, .astream
, .abatch
执行图),将使用上述方法的异步版本(.aput
, .aput_writes
, .aget_tuple
, .alist
)。
Note
要异步运行您的图,可以使用 InMemorySaver
或 SQLite/Postgres 检查点器的异步版本 -- AsyncSqliteSaver
/ AsyncPostgresSaver
检查点器。
序列化器¶
当 checkpointer 存储图状态时,它们需要对状态中的通道值进行序列化。这是通过序列化器对象完成的。
langgraph_checkpoint
定义了用于实现序列化器的 协议,并提供了一个默认实现 (JsonPlusSerializer),它支持广泛的各种类型,包括 LangChain 和 LangGraph 的基本类型、日期时间、枚举等。
功能¶
人机协作¶
首先,检查点(checkpointers)通过允许人类检查、中断和批准图的步骤,从而促进人机协作工作流。这些工作流需要检查点,因为人类必须能够在任何时间点查看图的状态,并且在人类对状态进行任何更新后,图必须能够恢复执行。请参阅这些操作指南,以获取具体的示例。
记忆¶
其次,检查点允许在交互之间使用"记忆"。在重复的人类交互情况下(如对话),后续消息可以发送到该线程,它将保留对之前消息的记忆。请参阅此操作指南,以了解如何使用检查点添加和管理对话记忆的端到端示例。
时间旅行¶
第三,检查点允许"时间旅行",使用户能够重播之前的图执行,以查看和/或调试特定的图步骤。此外,检查点使得可以在任意检查点分叉图状态,以探索替代路径。
容错性¶
最后,检查点还提供容错性和错误恢复功能:如果在某个超级步骤中一个或多个节点失败,可以从上一个成功步骤重新启动图。此外,当某个超级步骤中的图节点在执行过程中失败时,LangGraph会存储该超级步骤中其他已完成成功的节点的待处理检查点写入,因此当我们从该超级步骤恢复图执行时,不会重新运行那些成功的节点。
待处理写入¶
此外,当某个超级步骤中的图节点在执行过程中失败时,LangGraph会存储该超级步骤中其他已完成成功的节点的待处理检查点写入,因此当我们从该超级步骤恢复图执行时,不会重新运行那些成功的节点。