Skip to content

如何添加跨线程持久化(功能 API)

前提条件

本指南假设您熟悉以下内容:

LangGraph 允许您在 不同的 线程 之间持久化数据。例如,您可以将有关用户的信息(如姓名或偏好)存储在一个共享的(跨线程)内存中,并在新的线程(例如新的对话)中重用它们。

使用 功能 API 时,可以通过使用 Store 接口来设置存储和检索记忆:

  1. 创建一个 Store 的实例

    from langgraph.store.memory import InMemoryStore, BaseStore
    
    store = InMemoryStore()
    
  2. store 实例传递给 entrypoint() 装饰器,并在函数签名中暴露 store 参数:

    from langgraph.func import entrypoint
    
    @entrypoint(store=store)
    def workflow(inputs: dict, store: BaseStore):
        my_task(inputs).result()
        ...
    

在本指南中,我们将展示如何构建并使用一个具有通过 Store 接口实现的共享内存的工作流。

Note

本指南中使用的 Store API 的支持是在 LangGraph v0.2.32 中添加的。

本指南中使用的 Store API 的 indexquery 参数的支持是在 LangGraph v0.2.54 中添加的。

注意

如果您需要为 StateGraph 添加跨线程持久化,请查看这个 操作指南

设置

首先,让我们安装所需的包并设置我们的 API 密钥

pip install -U langchain_anthropic langchain_openai langgraph
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")

为 LangGraph 开发设置 LangSmith

注册 LangSmith 以快速发现并改进你的 LangGraph 项目的问题。LangSmith 允许你使用追踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 —— 了解更多如何入门的信息,请参阅 此处

示例:具有长期记忆的简单聊天机器人

IMG_PLACEHOLDER_1

API Reference: RetrievalQA | ChatOpenAI | FAISS | TextLoader | CharacterTextSplitter | VectorstoreIndexCreator

from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAI
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.indexes import VectorstoreIndexCreator

# Load document and split in to chunks
loader = TextLoader("./state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

# Create the vector store index
index_creator = VectorstoreIndexCreator(
    vectorstore_cls=FAISS,
    embedding_cls=OpenAI(),
)
docsearch = index_creator.from_loaders([loader])

# Create the chain to answer questions
qa_chain = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0),
    chain_type="stuff",
    retriever=docsearch.as_retriever()
)

# Ask it a question
qa_chain.run("What did the president say about Congress?")

IMG_PLACEHOLDER_2

Notes

  • This is a simple example that demonstrates how to use VectorstoreIndexCreator to create a vector store index from a document and then use it to answer questions.
  • The chatbot will remember the information from the document for future interactions.
  • This is a very basic example and you can improve it by using more advanced techniques such as fine-tuning the model or using a more sophisticated vector store.

定义存储

在此示例中,我们将创建一个工作流,该工作流能够获取用户偏好的信息。我们将通过定义一个 InMemoryStore 来实现这一点——一个可以在内存中存储数据并查询这些数据的对象。

使用 Store 接口存储对象时,你需要定义两件事:

  • 对象的命名空间,一个元组(类似于目录)
  • 对象的键(类似于文件名)

在我们的示例中,我们将使用 ("memories", <user_id>) 作为命名空间,并为每个新记忆使用随机 UUID 作为键。

重要的是,为了确定用户,我们将在节点函数的 config 关键字参数中传递 user_id

让我们首先定义我们的存储!

API Reference: OpenAIEmbeddings

from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings

in_memory_store = InMemoryStore(
    index={
        "embed": OpenAIEmbeddings(model="text-embedding-3-small"),
        "dims": 1536,
    }
)

创建工作流

API Reference: ChatAnthropic | RunnableConfig | BaseMessage | entrypoint | task | add_messages | MemorySaver

import uuid

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore


model = ChatAnthropic(model="claude-3-5-sonnet-latest")


@task
def call_model(messages: list[BaseMessage], memory_store: BaseStore, user_id: str):
    namespace = ("memories", user_id)
    last_message = messages[-1]
    memories = memory_store.search(namespace, query=str(last_message.content))
    info = "\n".join([d.value["data"] for d in memories])
    system_msg = f"You are a helpful assistant talking to the user. User info: {info}"

    # Store new memories if the user asks the model to remember
    if "remember" in last_message.content.lower():
        memory = "User name is Bob"
        memory_store.put(namespace, str(uuid.uuid4()), {"data": memory})

    response = model.invoke([{"role": "system", "content": system_msg}] + messages)
    return response


# NOTE: we're passing the store object here when creating a workflow via entrypoint()
@entrypoint(checkpointer=MemorySaver(), store=in_memory_store)
def workflow(
    inputs: list[BaseMessage],
    *,
    previous: list[BaseMessage],
    config: RunnableConfig,
    store: BaseStore,
):
    user_id = config["configurable"]["user_id"]
    previous = previous or []
    inputs = add_messages(previous, inputs)
    response = call_model(inputs, store, user_id).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

Note

如果你使用的是 LangGraph Cloud 或 LangGraph Studio,你 不需要 在入口点装饰器中传递 store,因为它会自动完成。

运行工作流!

现在,让我们在配置中指定一个用户ID,并告诉模型我们的名字:

config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "Hi! Remember: my name is Bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Hello Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?

config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Your name is Bob.

我们现在可以检查我们的内存存储,并验证我们确实为用户保存了记忆:

for memory in in_memory_store.search(("memories", "1")):
    print(memory.value)
{'data': 'User name is Bob'}

现在让我们以另一个用户的身份运行工作流,以验证关于第一个用户的记忆是否是自包含的:

config = {"configurable": {"thread_id": "3", "user_id": "2"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

I don't have any information about your name. I can only see our current conversation without any prior context or personal details about you. If you'd like me to know your name, feel free to tell me!