功能 API 概念¶
概述¶
功能 API 允许您在对现有代码进行最小改动的情况下,将 LangGraph 的关键功能 —— 持久化、记忆、人机协作 和 流式处理 —— 添加到您的应用中。
它被设计用于集成这些功能到现有的代码中,这些代码可能使用标准语言原语来进行分支和控制流,如 if
语句、for
循环和函数调用。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,功能 API 允许您在不强制执行严格执行模型的情况下引入这些能力。
功能 API 使用两个关键构建块:
@entrypoint
– 标记一个函数作为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。@task
– 表示一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点中异步执行。任务返回一个类似 future 的对象,可以等待或同步解析。
这为构建具有状态管理和流式处理的工作流提供了最小的抽象。
Tip
对于更倾向于声明式方法的用户,LangGraph 的 图 API 允许您使用图范式来定义工作流。两种 API 共享相同的底层运行时,因此可以在同一应用程序中一起使用它们。 请参阅 功能 API 与图 API 部分,比较这两种范式的差异。
示例¶
下面我们将演示一个简单的应用程序,该程序撰写一篇论文并中断以请求人工审核。
API Reference: MemorySaver | entrypoint | task | interrupt
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
@task
def write_essay(topic: str) -> str:
"""根据给定主题撰写一篇论文。"""
time.sleep(1) # 用于表示一个长时间运行任务的占位符。
return f"关于主题的论文:{topic}"
@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
"""一个简单的流程,用于撰写论文并请求审核。"""
essay = write_essay("cat").result()
is_approved = interrupt({
# 作为参数传递给interrupt的任何可序列化为JSON的有效载荷。
# 当从工作流中流式传输数据时,它将在客户端显示为Interrupt。
"essay": essay, # 我们想要审核的论文。
# 我们可以添加任何额外的信息。
# 例如,引入一个名为"action"的键,并附带一些说明。
"action": "请批准或拒绝这篇论文",
})
return {
"essay": essay, # 生成的论文
"is_approved": is_approved, # 来自人工审核的响应
}
详细解释
此工作流将撰写一个关于“猫”的主题的论文,然后暂停以获取人工审核。工作流可以无限期地暂停,直到提供审核结果。
当工作流恢复时,它会从头开始执行,但由于write_essay
任务的结果已经保存,因此结果将从检查点加载,而不是重新计算。
import time
import uuid
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
@task
def write_essay(topic: str) -> str:
"""根据给定主题撰写一篇论文。"""
time.sleep(1) # 这是一个长时间运行任务的占位符。
return f"关于主题的论文:{topic}"
@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
"""一个简单的流程,用于撰写论文并请求审核。"""
essay = write_essay("cat").result()
is_approved = interrupt({
# 作为参数传递给interrupt的任何可序列化为JSON的有效载荷。
# 当从工作流中流式传输数据时,它将在客户端显示为Interrupt。
"essay": essay, # 我们想要审核的论文。
# 我们可以添加任何额外的信息。
# 例如,引入一个名为"action"的键,并附带一些说明。
"action": "请批准或拒绝这篇论文",
})
return {
"essay": essay, # 生成的论文
"is_approved": is_approved, # 来自人工审核的响应
}
thread_id = str(uuid.uuid4())
config = {
"configurable": {
"thread_id": thread_id
}
}
for item in workflow.stream("cat", config):
print(item)
{'write_essay': '关于主题的论文:cat'}
{'__interrupt__': (Interrupt(value={'essay': '关于主题的论文:cat', 'action': '请批准或拒绝这篇论文'}, resumable=True, ns=['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'], when='during'),)}
已经撰写了一篇论文,准备进行审核。一旦提供了审核,我们可以恢复工作流:
from langgraph.types import Command
# 从用户处获取审核(例如通过UI)
# 在这种情况下,我们使用布尔值,但可以是任何可序列化为JSON的值。
human_review = True
for item in workflow.stream(Command(resume=human_review), config):
print(item)
工作流已完成,并且审核已添加到论文中。
入口点¶
@entrypoint
装饰器可用于从函数创建工作流。它封装了工作流逻辑并管理执行流程,包括处理 长时间运行的任务 和 中断。
定义¶
**入口点**通过使用 @entrypoint
装饰器装饰一个函数来定义。
该函数 必须接受一个位置参数,作为工作流输入。如果您需要传递多个数据项,请使用字典作为第一个参数的输入类型。
使用 entrypoint
装饰函数会生成一个 Pregel
实例,这有助于管理工作流的执行(例如,处理流式传输、恢复和检查点)。
通常您希望向 @entrypoint
装饰器传递一个 checkpointer 以启用持久化并使用诸如 human-in-the-loop 的功能。
序列化
入口点**的 **输入 和 输出 必须是 JSON 可序列化的,以支持检查点。有关详细信息,请参阅 序列化 部分。
可注入参数¶
在声明 entrypoint
时,您可以请求访问在运行时自动注入的附加参数。这些参数包括:
参数 | 描述 |
---|---|
previous | 访问给定线程的前一个 checkpoint 相关的状态。请参见 短期记忆。 |
store | BaseStore 的实例。适用于 长期记忆。 |
writer | 在使用 Async Python < 3.11 时,用于访问 StreamWriter。详情请参见 使用函数式 API 进行流式传输。 |
config | 用于访问运行时配置。更多信息请参见 RunnableConfig。 |
Important
使用适当的名称和类型注解声明参数。
请求可注入参数
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore(...) # 用于长期记忆的 InMemoryStore 实例
@entrypoint(
checkpointer=checkpointer, # 指定 checkpointer
store=in_memory_store # 指定 store
)
def my_workflow(
some_input: dict, # 输入(例如,通过 `invoke` 传递)
*,
previous: Any = None, # 用于短期记忆
store: BaseStore, # 用于长期记忆
writer: StreamWriter, # 用于流式传输自定义数据
config: RunnableConfig # 用于访问传递给入口点的配置
) -> ...:
执行¶
使用 @entrypoint
会产生一个 Pregel
对象,可以使用 invoke
、ainvoke
、stream
和 astream
方法进行执行。
恢复¶
可以在 [interrupt][langgraph.types.interrupt] 之后通过将 resume 值传递给 [Command][langgraph.types.Command] 原语来恢复执行。
错误后的恢复
要恢复错误后,使用相同的 thread id (config) 并传入 None
来运行 entrypoint
。
这假设底层 错误 已经解决,并且执行可以成功继续。
短期记忆¶
当使用 checkpointer
定义 entrypoint
时,它会在相同 thread id 的连续调用之间存储信息到 检查点 中。
这允许使用 previous
参数访问前一次调用的状态。
默认情况下,previous
参数是前一次调用的返回值。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
previous = previous or 0
return number + previous
config = {
"configurable": {
"thread_id": "some_thread_id"
}
}
my_workflow.invoke(1, config) # 1 (previous 为 None)
my_workflow.invoke(2, config) # 3 (previous 是前一次调用的 1)
entrypoint.final
¶
entrypoint.final 是可以从入口点返回的特殊原语,允许 解耦 存储在 检查点 中的值与 入口点的返回值。
第一个值是入口点的返回值,第二个值是将保存在检查点中的值。类型注解是 entrypoint.final[return_type, save_type]
。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
previous = previous or 0
# 这将返回 previous 值给调用者,同时将
# 2 * number 保存到检查点中,将在下一次调用中用于 `previous` 参数。
return entrypoint.final(value=previous, save=2 * number)
config = {
"configurable": {
"thread_id": "1"
}
}
my_workflow.invoke(3, config) # 0 (previous 为 None)
my_workflow.invoke(1, config) # 6 (previous 是前一次调用的 3 * 2)
任务¶
**任务**表示一个离散的工作单元,例如API调用或数据处理步骤。它有两个关键特性:
- 异步执行:任务设计为异步执行,允许多个操作并发运行而不会阻塞。
- 检查点:任务结果保存到检查点中,从而可以从中止的最后保存状态恢复工作流。(有关更多细节,请参见persistence)。
定义¶
使用@task
装饰器定义任务,该装饰器包装一个常规的Python函数。
API Reference: task
from langgraph.func import task
@task()
def slow_computation(input_value):
# 模拟一个长时间运行的操作
...
return result
序列化
**任务的输出**必须是JSON可序列化的,以支持检查点。
执行¶
任务**只能从**入口点、另一个**任务**或state graph节点中调用。
**任务**不能直接从主应用程序代码中调用。
当你调用一个**任务**时,它会立即返回一个未来对象。未来是一个用于结果的占位符,该结果将在稍后可用。
要获取**任务**的结果,你可以同步等待(使用result()
)或者异步等待(使用await
)。
何时使用任务¶
**任务**在以下场景中非常有用:
- 检查点(Checkpointing):当你需要将长时间运行操作的结果保存到检查点中,以便在恢复工作流时无需重新计算。
- 人机协作(Human-in-the-loop):如果你正在构建一个需要人工干预的工作流,你**必须**使用**任务**来封装任何随机性(例如 API 调用),以确保工作流可以正确恢复。有关更多细节,请参阅 确定性(determinism) 部分。
- 并行执行(Parallel Execution):对于 I/O 密集型任务,**任务**支持并行执行,允许多个操作并发运行而不会阻塞(例如调用多个 API)。
- 可观测性(Observability):将操作封装在**任务**中,可以通过 LangSmith 跟踪工作流的进度并监控单个操作的执行情况。
- 可重试的工作(Retryable Work):当需要重试工作以处理失败或不一致时,**任务**提供了一种封装和管理重试逻辑的方式。
序列化¶
在 LangGraph 中,序列化有两个关键方面:
@entrypoint
的输入和输出必须可以被 JSON 序列化。@task
的输出必须可以被 JSON 序列化。
这些要求对于启用检查点和工作流恢复是必要的。使用 Python 原语类型,如字典、列表、字符串、数字和布尔值,以确保你的输入和输出可以被序列化。
序列化确保了工作流状态(例如任务结果和中间值)可以被可靠地保存和恢复。这对于实现人机交互、容错性和并行执行至关重要。
如果提供不可序列化的输入或输出,在配置了检查点器的工作流中将会导致运行时错误。
确定性¶
要使用诸如 human-in-the-loop 之类的特性,任何随机性都应封装在 任务 中。这保证了当执行被暂停(例如,用于 human-in-the-loop)并恢复时,它将遵循相同的 步骤序列,即使 任务 的结果是非确定性的。
LangGraph 通过在执行过程中持久化 任务 和 子图 的结果来实现这种行为。一个设计良好的工作流确保恢复执行时遵循 相同的步骤序列,从而能够正确检索之前计算的结果,而无需重新执行它们。这对于长时间运行的 任务 或具有非确定性结果的 任务 来说尤其有用,因为它避免了重复之前的工作,并允许从本质上相同的状态继续执行。
尽管工作流的不同运行可能会产生不同的结果,但恢复一个 特定 的运行应该始终遵循相同的记录步骤序列。这使得 LangGraph 能够高效查找在图被中断之前已执行的 任务 和 子图 的结果,并避免重新计算它们。
幂等性¶
幂等性确保多次执行相同操作会产生相同的结果。这有助于防止因步骤重新运行而导致的重复 API 调用和冗余处理。始终将 API 调用放在 tasks 函数中以进行检查点记录,并设计它们在重新执行时也具有幂等性。如果 task 开始但未能成功完成,可能会发生重新执行。然后,如果工作流被恢复,task 将再次运行。使用幂等性键或验证现有结果以避免重复。
功能 API 与 图 API¶
功能 API 和 图 API(StateGraph) 提供了两种不同的范式,用于使用 LangGraph 创建应用程序。以下是一些关键区别:
- 控制流:功能 API 不需要考虑图结构。您可以使用标准的 Python 构造来定义工作流。这通常会减少您需要编写的代码量。
- 短期记忆:图 API 需要声明一个 State,并且可能需要定义 reducers 来管理对图状态的更新。
@entrypoint
和@tasks
不需要显式的状态管理,因为它们的状态作用域限定在函数内,并且不会在函数之间共享。 - 检查点:两种 API 都生成并使用检查点。在 图 API 中,每次 superstep 之后都会生成一个新的检查点。而在 功能 API 中,当任务执行时,其结果将保存到与给定入口点关联的现有检查点中,而不是创建新的检查点。
- 可视化:图 API 使得将工作流作为图进行可视化变得容易,这对于调试、理解工作流以及与其他人的分享非常有用。功能 API 不支持可视化,因为图是在运行时动态生成的。
常见错误¶
处理副作用¶
将副作用(例如写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会重复执行。
在这个示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时会再次执行。
在这个示例中,副作用被封装在任务中,从而保证了恢复时的一致性执行。
非确定性的控制流¶
那些每次可能返回不同结果的操作(如获取当前时间或随机数)应封装在任务中,以确保在恢复时能返回相同的结果。
- 在任务中:获取随机数(5)→ 中断 → 恢复 → 返回(5)→ ...
- 不在任务中:获取随机数(5)→ 中断 → 恢复 → 获取新的随机数(7)→ ...
当使用**人机协作**的工作流并包含多个中断调用时,这一点尤为重要。LangGraph 为每个任务/入口点保留了一个恢复值列表。当遇到中断时,它会与相应的恢复值匹配。 这种匹配是严格**基于索引的**,因此恢复值的顺序应该与中断的顺序一致。
如果在恢复时未保持执行顺序,一个 interrupt
调用可能会与错误的 resume
值匹配,导致结果错误。
请阅读关于 确定性 的部分以了解更多信息。
在这个示例中,工作流使用当前时间来决定执行哪个任务。这是非确定性的,因为工作流的结果取决于其执行的时间。
from langgraph.func import entrypoint
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
t0 = inputs["t0"]
t1 = time.time()
delta_t = t1 - t0
if delta_t > 1:
result = slow_task(1).result()
value = interrupt("question")
else:
result = slow_task(2).result()
value = interrupt("question")
return {
"result": result,
"value": value
}
在这个示例中,工作流使用输入 t0
来决定执行哪个任务。这是确定性的,因为工作流的结果仅取决于输入。
import time
from langgraph.func import task
@task
def get_time() -> float:
return time.time()
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
t0 = inputs["t0"]
t1 = get_time().result()
delta_t = t1 - t0
if delta_t > 1:
result = slow_task(1).result()
value = interrupt("question")
else:
result = slow_task(2).result()
value = interrupt("question")
return {
"result": result,
"value": value
}