Skip to content

如何添加线程级持久性(功能API)

先决条件

本指南假设您熟悉以下内容:

许多AI应用程序需要内存来在同一个线程(例如,对话的多个回合)上共享上下文。在LangGraph功能API中,可以使用线程级持久性将这种内存添加到任何入口点工作流中。

在创建LangGraph工作流时,您可以使用检查点器设置其结果持久化:

  1. 创建一个检查点器的实例:

    from langgraph.checkpoint.memory import MemorySaver
    
    checkpointer = MemorySaver()       
    
  2. 将检查点器实例传递给entrypoint()装饰器:

    from langgraph.func import entrypoint
    
    @entrypoint(checkpointer=checkpointer)
    def workflow(inputs)
        ...
    
  3. 可选地在工作流函数签名中暴露previous参数:

    @entrypoint(checkpointer=checkpointer)
    def workflow(
        inputs,
        *,
        # 您可以可选地在工作流函数签名中指定`previous`,
        # 以访问上次执行时的工作流返回值
        previous
    ):
        previous = previous or []
        combined_inputs = previous + inputs
        result = do_something(combined_inputs)
        ...
    
  4. 可选地选择从工作流返回哪些值以及由检查点器保存哪些值为previous

    @entrypoint(checkpointer=checkpointer)
    def workflow(inputs, *, previous):
        ...
        result = do_something(...)
        return entrypoint.final(value=result, save=combine(inputs, result))
    

本指南展示了如何将线程级持久性添加到您的工作流中。

注意

如果您需要跨多个对话或用户共享的内存(跨线程持久性),请参阅此如何操作指南

注意

如果您需要将线程级持久性添加到StateGraph,请参阅此如何操作指南

环境搭建

首先我们需要安装所需的包

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_anthropic

接下来,我们需要为Anthropic(我们将使用的大型语言模型)设置API密钥。

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")

为LangGraph开发设置LangSmith

注册LangSmith可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

示例:具有短期记忆的简单聊天机器人

我们将使用一个包含单个任务的工作流,该任务调用一个聊天模型

首先,我们定义将要使用的模型:

from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

API Reference: ChatAnthropic

现在我们可以定义我们的任务和工作流。为了添加持久性,我们需要传递一个Checkpointer入口点装饰器

from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver


@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response


checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

API Reference: BaseMessage | add_messages | entrypoint | task | MemorySaver

如果我们尝试使用此工作流,对话的上下文将在交互之间得以保留:

Note

如果您使用的是LangGraph Cloud或LangGraph Studio,则__不需要__将检查点传递给入口点装饰器,因为这是自动完成的。

我们现在可以与代理交互,并看到它记得之前的对话内容!

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Hi Bob! I'm Claude. Nice to meet you! How are you today?
您可以随时恢复之前的线程:

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Your name is Bob.
如果我们想要开始一个新的对话,可以传入一个不同的thread_id。瞬间!所有的聊天记录都消失了!

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream(
    [input_message],
    {"configurable": {"thread_id": "2"}},
    stream_mode="values",
):
    chunk.pretty_print()
================================== Ai Message ==================================

I don't know your name unless you tell me. Each conversation I have starts fresh, so I don't have access to any previous interactions or personal information unless you share it with me.

流式传输代币

如果您希望从聊天机器人流式传输LLM代币,可以使用stream_mode="messages"。请参阅此操作指南以了解更多信息。

Comments