Skip to content

如何添加跨线程持久性(功能API)

先决条件

本指南假设您熟悉以下内容:

LangGraph允许您在**不同的线程**之间持久化数据。例如,您可以将用户信息(如姓名或偏好)存储在共享(跨线程)内存中,并在新线程(例如,新对话)中重复使用它们。

在使用功能API时,您可以设置使用Store接口来存储和检索内存:

  1. 创建一个Store实例

    from langgraph.store.memory import InMemoryStore, BaseStore
    
    store = InMemoryStore()
    
  2. store实例传递给entrypoint()装饰器,并在函数签名中暴露store参数:

    from langgraph.func import entrypoint
    
    @entrypoint(store=store)
    def workflow(inputs: dict, store: BaseStore):
        my_task(inputs).result()
        ...
    

在本指南中,我们将展示如何构建和使用一个具有使用Store接口实现的共享内存的工作流。

注意

在本指南中使用的Store API的支持是在LangGraph v0.2.32中添加的。

在本指南中使用的Store API的__index__和__query__参数的支持是在LangGraph v0.2.54中添加的。

注意

如果您需要向StateGraph添加跨线程持久性,请查看此如何操作指南

设置

首先,让我们安装所需的包并设置我们的API密钥

%%capture --no-stderr
%pip install -U langchain_anthropic langchain_openai langgraph
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")

使用LangSmith进行LangGraph开发

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

示例:具有长期记忆的简单聊天机器人

定义存储

在这个示例中,我们将创建一个工作流,能够检索用户偏好的信息。为此,我们将定义一个 InMemoryStore 对象,它可以将数据存储在内存中,并查询这些数据。

在使用 Store 接口存储对象时,你需要定义两件事:

  • 对象的命名空间,一个元组(类似于目录)
  • 对象的键(类似于文件名)

在我们的示例中,我们将使用 ("memories", <user_id>) 作为命名空间,并使用随机 UUID 作为每个新记忆的键。

重要的是,为了确定用户,我们将通过节点函数的配置关键字参数传递 user_id

让我们先来定义我们的存储吧!

from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings

in_memory_store = InMemoryStore(
    index={
        "embed": OpenAIEmbeddings(model="text-embedding-3-small"),
        "dims": 1536,
    }
)

API Reference: OpenAIEmbeddings

创建工作流

import uuid

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore


model = ChatAnthropic(model="claude-3-5-sonnet-latest")


@task
def call_model(messages: list[BaseMessage], memory_store: BaseStore, user_id: str):
    namespace = ("memories", user_id)
    last_message = messages[-1]
    memories = memory_store.search(namespace, query=str(last_message.content))
    info = "\n".join([d.value["data"] for d in memories])
    system_msg = f"You are a helpful assistant talking to the user. User info: {info}"

    # Store new memories if the user asks the model to remember
    if "remember" in last_message.content.lower():
        memory = "User name is Bob"
        memory_store.put(namespace, str(uuid.uuid4()), {"data": memory})

    response = model.invoke([{"role": "system", "content": system_msg}] + messages)
    return response


# NOTE: we're passing the store object here when creating a workflow via entrypoint()
@entrypoint(checkpointer=MemorySaver(), store=in_memory_store)
def workflow(
    inputs: list[BaseMessage],
    *,
    previous: list[BaseMessage],
    config: RunnableConfig,
    store: BaseStore,
):
    user_id = config["configurable"]["user_id"]
    previous = previous or []
    inputs = add_messages(previous, inputs)
    response = call_model(inputs, store, user_id).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

API Reference: ChatAnthropic | RunnableConfig | BaseMessage | entrypoint | task | add_messages | MemorySaver

Note

如果您使用的是LangGraph Cloud或LangGraph Studio,则__不需要__将store传递给入口点装饰器,因为这是自动完成的。

运行工作流!

现在让我们在配置中指定一个用户ID,并告诉模型我们的名字:

config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "Hi! Remember: my name is Bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Hello Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?

config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Your name is Bob.
我们现在可以检查内存中的存储,并验证确实为用户保存了记忆:

for memory in in_memory_store.search(("memories", "1")):
    print(memory.value)
{'data': 'User name is Bob'}
现在让我们为另一个用户运行工作流,以验证关于第一个用户的记忆是自包含的:

config = {"configurable": {"thread_id": "3", "user_id": "2"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

I don't have any information about your name. I can only see our current conversation without any prior context or personal details about you. If you'd like me to know your name, feel free to tell me!

Comments