如何添加跨线程持久化(功能 API)¶
LangGraph 允许您在 不同的 线程 之间持久化数据。例如,您可以将有关用户的信息(如姓名或偏好)存储在一个共享的(跨线程)内存中,并在新的线程(例如新的对话)中重用它们。
使用 功能 API 时,可以通过使用 Store 接口来设置存储和检索记忆:
-
创建一个
Store
的实例 -
将
store
实例传递给entrypoint()
装饰器,并在函数签名中暴露store
参数:
在本指南中,我们将展示如何构建并使用一个具有通过 Store 接口实现的共享内存的工作流。
Note
本指南中使用的 Store
API 的支持是在 LangGraph v0.2.32
中添加的。
本指南中使用的 Store
API 的 index 和 query 参数的支持是在 LangGraph v0.2.54
中添加的。
注意
如果您需要为 StateGraph
添加跨线程持久化,请查看这个 操作指南。
设置¶
首先,让我们安装所需的包并设置我们的 API 密钥
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
为 LangGraph 开发设置 LangSmith
注册 LangSmith 以快速发现并改进你的 LangGraph 项目的问题。LangSmith 允许你使用追踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 —— 了解更多如何入门的信息,请参阅 此处
示例:具有长期记忆的简单聊天机器人¶
IMG_PLACEHOLDER_1
API Reference: RetrievalQA | ChatOpenAI | FAISS | TextLoader | CharacterTextSplitter | VectorstoreIndexCreator
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAI
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.indexes import VectorstoreIndexCreator
# Load document and split in to chunks
loader = TextLoader("./state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
# Create the vector store index
index_creator = VectorstoreIndexCreator(
vectorstore_cls=FAISS,
embedding_cls=OpenAI(),
)
docsearch = index_creator.from_loaders([loader])
# Create the chain to answer questions
qa_chain = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0),
chain_type="stuff",
retriever=docsearch.as_retriever()
)
# Ask it a question
qa_chain.run("What did the president say about Congress?")
IMG_PLACEHOLDER_2
Notes¶
- This is a simple example that demonstrates how to use
VectorstoreIndexCreator
to create a vector store index from a document and then use it to answer questions. - The chatbot will remember the information from the document for future interactions.
- This is a very basic example and you can improve it by using more advanced techniques such as fine-tuning the model or using a more sophisticated vector store.
定义存储¶
在此示例中,我们将创建一个工作流,该工作流能够获取用户偏好的信息。我们将通过定义一个 InMemoryStore
来实现这一点——一个可以在内存中存储数据并查询这些数据的对象。
使用 Store
接口存储对象时,你需要定义两件事:
- 对象的命名空间,一个元组(类似于目录)
- 对象的键(类似于文件名)
在我们的示例中,我们将使用 ("memories", <user_id>)
作为命名空间,并为每个新记忆使用随机 UUID 作为键。
重要的是,为了确定用户,我们将在节点函数的 config 关键字参数中传递 user_id
。
让我们首先定义我们的存储!
API Reference: OpenAIEmbeddings
from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings
in_memory_store = InMemoryStore(
index={
"embed": OpenAIEmbeddings(model="text-embedding-3-small"),
"dims": 1536,
}
)
创建工作流¶
API Reference: ChatAnthropic | RunnableConfig | BaseMessage | entrypoint | task | add_messages | MemorySaver
import uuid
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
model = ChatAnthropic(model="claude-3-5-sonnet-latest")
@task
def call_model(messages: list[BaseMessage], memory_store: BaseStore, user_id: str):
namespace = ("memories", user_id)
last_message = messages[-1]
memories = memory_store.search(namespace, query=str(last_message.content))
info = "\n".join([d.value["data"] for d in memories])
system_msg = f"You are a helpful assistant talking to the user. User info: {info}"
# Store new memories if the user asks the model to remember
if "remember" in last_message.content.lower():
memory = "User name is Bob"
memory_store.put(namespace, str(uuid.uuid4()), {"data": memory})
response = model.invoke([{"role": "system", "content": system_msg}] + messages)
return response
# NOTE: we're passing the store object here when creating a workflow via entrypoint()
@entrypoint(checkpointer=MemorySaver(), store=in_memory_store)
def workflow(
inputs: list[BaseMessage],
*,
previous: list[BaseMessage],
config: RunnableConfig,
store: BaseStore,
):
user_id = config["configurable"]["user_id"]
previous = previous or []
inputs = add_messages(previous, inputs)
response = call_model(inputs, store, user_id).result()
return entrypoint.final(value=response, save=add_messages(inputs, response))
Note
如果你使用的是 LangGraph Cloud 或 LangGraph Studio,你 不需要 在入口点装饰器中传递 store,因为它会自动完成。
运行工作流!¶
现在,让我们在配置中指定一个用户ID,并告诉模型我们的名字:
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "Hi! Remember: my name is Bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
================================== Ai Message ==================================
Hello Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
我们现在可以检查我们的内存存储,并验证我们确实为用户保存了记忆:
现在让我们以另一个用户的身份运行工作流,以验证关于第一个用户的记忆是否是自包含的: