Skip to content

如何添加线程级持久化(功能 API)

前提条件

本指南假定您熟悉以下内容:

LangGraph API 用户无需此操作

如果您使用的是 LangGraph API,则不需要手动实现检查点。API 会自动为您处理检查点。当您在自己的自定义服务器中实现 LangGraph 时,本指南才相关。

许多 AI 应用需要内存来在同一个 线程 的多个交互之间共享上下文(例如,对话的多轮)。在 LangGraph 功能 API 中,可以使用 线程级持久化 将这种内存添加到任何 入口点 工作流中。

创建 LangGraph 工作流时,可以通过使用 检查点器 来设置其结果的持久化:

  1. 创建一个检查点器的实例:

    from langgraph.checkpoint.memory import MemorySaver
    
    checkpointer = MemorySaver()       
    
  2. checkpointer 实例传递给 entrypoint() 装饰器:

    from langgraph.func import entrypoint
    
    @entrypoint(checkpointer=checkpointer)
    def workflow(inputs)
        ...
    
  3. 可选地,在工作流函数签名中暴露 previous 参数:

    @entrypoint(checkpointer=checkpointer)
    def workflow(
        inputs,
        *,
        # 您可以在工作流函数签名中选择性地指定 `previous`
        # 以访问上一次执行时的工作流返回值
        previous
    ):
        previous = previous or []
        combined_inputs = previous + inputs
        result = do_something(combined_inputs)
        ...
    
  4. 可选地,选择哪些值将从工作流返回,哪些值将被检查点器保存为 previous

    @entrypoint(checkpointer=checkpointer)
    def workflow(inputs, *, previous):
        ...
        result = do_something(...)
        return entrypoint.final(value=result, save=combine(inputs, result))
    

本指南展示了如何将线程级持久化添加到您的工作流中。

注意

如果您需要 跨多个对话或用户共享 的内存(跨线程持久化),请查看这个操作指南

注意

如果您需要向 StateGraph 添加线程级持久化,请查看这个操作指南

设置

首先我们需要安装所需的软件包

pip install --quiet -U langgraph langchain_anthropic

接下来,我们需要为Anthropic(我们将使用的LLM)设置API密钥。

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")

为 LangGraph 开发设置 LangSmith

注册 LangSmith 以快速发现并提升你的 LangGraph 项目性能。LangSmith 允许你使用追踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 —— 了解如何入门,请阅读 此处 的更多信息。

示例:带短期记忆的简单聊天机器人

我们将使用一个包含单个任务的工作流,该任务会调用一个聊天模型

首先,我们来定义将要使用的模型:

API Reference: ChatAnthropic

from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

现在我们可以定义我们的任务和工作流。为了添加持久化功能,我们需要向 entrypoint() 装饰器传递一个 Checkpointer

API Reference: BaseMessage | add_messages | entrypoint | task | MemorySaver

from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver


@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response


checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

如果我们尝试使用此工作流程,对话的上下文将在交互中保持持久化:

Note

如果你使用的是 LangGraph 平台或 LangGraph Studio,你 不需要 在入口点装饰器中传递 checkpointer,因为它会自动完成。

我们现在可以与代理进行交互,并看到它能够记住之前的对话!

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Hi Bob! I'm Claude. Nice to meet you! How are you today?

你可以随时恢复之前的线程:

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Your name is Bob.

如果我们想要开启一个新对话,可以传入一个不同的 thread_id。变魔术般地!所有的记忆都消失了!

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream(
    [input_message],
    {"configurable": {"thread_id": "2"}},
    stream_mode="values",
):
    chunk.pretty_print()
================================== Ai Message ==================================

I don't know your name unless you tell me. Each conversation I have starts fresh, so I don't have access to any previous interactions or personal information unless you share it with me.

流式传输 tokens

如果您希望从聊天机器人中流式传输 LLM tokens,可以使用 stream_mode="messages"。请查看这个 操作指南 以了解更多信息。