使用函数式 API¶
创建一个简单的流程¶
当定义 entrypoint
时,输入被限制为函数的第一个参数。要传递多个输入,可以使用字典。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = inputs["value"]
another_value = inputs["another_value"]
...
my_workflow.invoke({"value": 1, "another_value": 2})
扩展示例:简单流程
import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
# 检查数字是否为偶数的任务
@task
def is_even(number: int) -> bool:
return number % 2 == 0
# 格式化消息的任务
@task
def format_message(is_even: bool) -> str:
return "The number is even." if is_even else "The number is odd."
# 创建用于持久化的 checkpointer
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
"""对数字进行分类的简单流程。"""
even = is_even(inputs["number"]).result()
return format_message(even).result()
# 使用唯一的线程 ID 运行流程
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
扩展示例:使用 LLM 撰写文章
此示例演示了如何语法上使用 @task
和 @entrypoint
装饰器。
由于提供了 checkpointer,流程结果将被保存在 checkpointer 中。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
llm = init_chat_model('openai:gpt-3.5-turbo')
# 任务:使用 LLM 生成文章
@task
def compose_essay(topic: str) -> str:
"""根据给定主题生成一篇文章。"""
return llm.invoke([
{"role": "system", "content": "你是一个撰写文章的有用助手。"},
{"role": "user", "content": f"写一篇关于 {topic} 的文章。"}
]).content
# 创建用于持久化的 checkpointer
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
"""使用 LLM 生成文章的简单流程。"""
return compose_essay(topic).result()
# 执行流程
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke("the history of flight", config=config)
print(result)
平行执行¶
通过并发调用任务并等待结果,可以实现任务的平行执行。这对于提高 I/O 密集型任务(例如为 LLM 调用 API)的性能非常有用。
@task
def add_one(number: int) -> int:
return number + 1
@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
futures = [add_one(i) for i in numbers]
return [f.result() for f in futures]
扩展示例:并行 LLM 调用
此示例演示了如何使用 @task
来并行运行多个 LLM 调用。每个调用生成一个关于不同主题的段落,并将结果合并成一个文本输出。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
# 初始化 LLM 模型
llm = init_chat_model("openai:gpt-3.5-turbo")
# 生成关于给定主题的段落的任务
@task
def generate_paragraph(topic: str) -> str:
response = llm.invoke([
{"role": "system", "content": "你是一个乐于助人的助手,能够撰写教育性的段落。"},
{"role": "user", "content": f"写一个关于 {topic} 的段落。"}
])
return response.content
# 创建用于持久化的 checkpointer
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
"""并行生成多个段落并将其合并。"""
futures = [generate_paragraph(topic) for topic in topics]
paragraphs = [f.result() for f in futures]
return "\n\n".join(paragraphs)
# 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
此示例利用了 LangGraph 的并发模型来提升执行时间,尤其是在涉及 I/O 的任务中,如 LLM 完成操作。
调用图¶
功能 API 和 图 API 可以在同一个应用程序中一起使用,因为它们共享相同的底层运行时。
API Reference: entrypoint | StateGraph
from langgraph.func import entrypoint
from langgraph.graph import StateGraph
builder = StateGraph()
...
some_graph = builder.compile()
@entrypoint()
def some_workflow(some_input: dict) -> int:
# 调用使用图 API 定义的图
result_1 = some_graph.invoke(...)
# 调用另一个使用图 API 定义的图
result_2 = another_graph.invoke(...)
return {
"result_1": result_1,
"result_2": result_2
}
扩展示例:从功能 API 调用一个简单图
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
# 定义共享状态类型
class State(TypedDict):
foo: int
# 定义一个简单的转换节点
def double(state: State) -> State:
return {"foo": state["foo"] * 2}
# 使用图 API 构建图
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()
# 定义功能 API 工作流
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
result = graph.invoke({"foo": x})
return {"bar": result["foo"]}
# 执行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(workflow.invoke(5, config=config)) # 输出: {'bar': 10}
调用其他入口点¶
你可以在一个 入口点 或 任务 中调用其他 入口点。
@entrypoint() # 将自动使用父入口点的 checkpointer
def some_other_workflow(inputs: dict) -> int:
return inputs["value"]
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = some_other_workflow.invoke({"value": 1})
return value
扩展示例:调用另一个入口点
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
# 初始化一个 checkpointer
checkpointer = MemorySaver()
# 可重复使用的子工作流,用于乘法运算
@entrypoint()
def multiply(inputs: dict) -> int:
return inputs["a"] * inputs["b"]
# 调用子工作流的主工作流
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
return {"product": result}
# 执行主工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(main.invoke({"x": 6, "y": 7}, config=config)) # 输出: {'product': 42}
流式传输¶
功能 API 使用与 图 API 相同的流式传输机制。请阅读 流式传输指南 以获取更多详细信息。
使用流式传输 API 同时流式传输更新和自定义数据的示例。
API Reference: entrypoint | MemorySaver | get_stream_writer
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
from langgraph.config import get_stream_writer # (1)!
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
writer = get_stream_writer() # (2)!
writer("Started processing") # (3)!
result = inputs["x"] * 2
writer(f"Result is {result}") # (4)!
return result
config = {"configurable": {"thread_id": "abc"}}
for mode, chunk in main.stream( # (5)!
{"x": 5},
stream_mode=["custom", "updates"], # (6)!
config=config
):
print(f"{mode}: {chunk}")
- 从
langgraph.config
导入get_stream_writer
。 - 在入口点内获取一个流式写入器实例。
- 在计算开始前发出自定义数据。
- 在计算结果后发出另一个自定义消息。
- 使用
.stream()
处理流式输出。 - 指定要使用的流式传输模式。
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Python < 3.11 的异步处理
如果使用 Python < 3.11 并编写异步代码,使用 get_stream_writer()
将无法正常工作。请直接使用 StreamWriter
类。有关详细信息,请参阅 Python < 3.11 的异步处理。
重试策略¶
API Reference: MemorySaver | entrypoint | task | RetryPolicy
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy
# 此变量仅用于演示目的,以模拟网络故障。
# 这不是你在实际代码中会拥有的内容。
attempts = 0
# 让我们配置 RetryPolicy 在遇到 ValueError 时进行重试。
# 默认的 RetryPolicy 是针对特定网络错误进行重试的。
retry_policy = RetryPolicy(retry_on=ValueError)
@task(retry_policy=retry_policy)
def get_info():
global attempts
attempts += 1
if attempts < 2:
raise ValueError('Failure')
return "OK"
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
return get_info().result()
config = {
"configurable": {
"thread_id": "1"
}
}
main.invoke({'any_input': 'foobar'}, config=config)
缓存任务¶
API Reference: entrypoint | task
import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy
@task(cache_policy=CachePolicy(ttl=120)) # (1)!
def slow_add(x: int) -> int:
time.sleep(1)
return x * 2
@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
result1 = slow_add(inputs["x"]).result()
result2 = slow_add(inputs["x"]).result()
return {"result1": result1, "result2": result2}
for chunk in main.stream({"x": 5}, stream_mode="updates"):
print(chunk)
#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
ttl
以秒为单位指定。缓存在此时间后将失效。
出错后的恢复¶
API Reference: MemorySaver | entrypoint | task | StreamWriter
import time
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
# 该变量仅用于演示目的,以模拟网络故障。
# 这不是你在实际代码中会拥有的内容。
attempts = 0
@task()
def get_info():
"""
模拟一个在成功前失败一次的任务。
第一次尝试时抛出异常,之后的尝试返回"OK"。
"""
global attempts
attempts += 1
if attempts < 2:
raise ValueError("Failure") # 模拟第一次尝试时的失败
return "OK"
# 初始化内存检查点以实现持久化
checkpointer = MemorySaver()
@task
def slow_task():
"""
通过引入1秒的延迟来模拟一个运行缓慢的任务。
"""
time.sleep(1)
return "Ran slow task."
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
"""
主工作流函数,按顺序运行slow_task和get_info任务。
参数:
- inputs:包含工作流输入值的字典。
- writer:用于流式传输自定义数据的StreamWriter。
工作流首先执行`slow_task`,然后尝试执行`get_info`,
它将在首次调用时失败。
"""
slow_task_result = slow_task().result() # 阻塞调用slow_task
get_info().result() # 第一次尝试时在此处抛出异常
return slow_task_result
# 带有唯一线程标识符的工作流执行配置
config = {
"configurable": {
"thread_id": "1" # 用于跟踪工作流执行的唯一标识符
}
}
# 此调用将由于slow_task的执行而花费约1秒
try:
# 第一次调用将由于`get_info`任务失败而引发异常
main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
pass # 优雅地处理失败
当我们恢复执行时,我们不需要重新运行slow_task
,因为其结果已经保存在检查点中。
人机协作¶
功能 API 使用 interrupt
函数和 Command
原语支持 人机协作 工作流。
请参阅以下示例以获取更多详细信息:
- 如何等待用户输入(功能 API): 展示如何使用功能 API 实现一个简单的人机协作工作流。
- 如何审查工具调用(功能 API): 指南演示了如何在 ReAct 代理中使用 LangGraph 功能 API 实现人机协作工作流。
短期记忆¶
短期记忆允许在相同 thread id 的不同 调用 之间存储信息。更多细节请参见 短期记忆。
将返回值与保存的值解耦¶
使用 entrypoint.final
来将返回给调用者的值与保存在检查点中的值解耦。这在以下情况下非常有用:
- 您希望返回一个计算结果(例如摘要或状态),但保存一个不同的内部值,供下一次调用使用。
- 您需要控制下次运行时传递给前一个参数的值。
API Reference: entrypoint | MemorySaver
from typing import Optional
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: Optional[int]) -> entrypoint.final[int, int]:
previous = previous or 0
total = previous + n
# 返回 *previous* 值给调用者,但将 *new* 总和保存到检查点中。
return entrypoint.final(value=previous, save=total)
config = {"configurable": {"thread_id": "my-thread"}}
print(accumulate.invoke(1, config=config)) # 0
print(accumulate.invoke(2, config=config)) # 1
print(accumulate.invoke(3, config=config)) # 3
聊天机器人示例¶
使用函数式 API 和 MemorySaver
检查点的一个简单聊天机器人的示例。
该机器人能够记住之前的对话并从中断的地方继续。
API Reference: BaseMessage | add_messages | entrypoint | task | MemorySaver | ChatAnthropic
from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-3-5-sonnet-latest")
@task
def call_model(messages: list[BaseMessage]):
response = model.invoke(messages)
return response
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
if previous:
inputs = add_messages(previous, inputs)
response = call_model(inputs).result()
return entrypoint.final(value=response, save=add_messages(inputs, response))
config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
扩展示例:构建一个简单的聊天机器人
如何添加线程级别的持久化(函数式 API): 展示了如何向函数式 API 工作流添加线程级别持久化,并实现一个简单的聊天机器人。
长期记忆¶
长期记忆 允许在不同的 thread id 之间存储信息。这在一次对话中学习某个用户的信息,并在另一次对话中使用时会很有用。
扩展示例:添加长期记忆
如何为功能型 API 添加跨线程持久化: 展示了如何为功能型 API 工作流添加跨线程持久化,并实现一个简单的聊天机器人。
工作流¶
- 工作流和代理 指南,了解如何使用函数式 API 构建工作流的更多示例。
代理¶
- 从零开始创建代理(函数式 API): 展示如何使用函数式 API 从零开始创建一个简单的代理。
- 如何构建多代理网络: 展示如何使用函数式 API 构建多代理网络。
- 如何在多代理应用中添加多轮对话(函数式 API): 允许最终用户与一个或多个代理进行多轮对话。
与其他库集成¶
- 使用函数式 API 将 LangGraph 的功能添加到其他框架中: 将 LangGraph 的持久化、记忆和流式处理等功能添加到那些本身不提供这些功能的代理框架中。