Skip to content

功能 API 概念

概述

功能 API 允许您在对现有代码进行最小改动的情况下,将 LangGraph 的关键功能 —— 持久化记忆人机协作流式处理 —— 添加到您的应用中。

它被设计用于集成这些功能到现有的代码中,这些代码可能使用标准语言原语来进行分支和控制流,如 if 语句、for 循环和函数调用。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,功能 API 允许您在不强制执行严格执行模型的情况下引入这些能力。

功能 API 使用两个关键构建块:

  • @entrypoint – 标记一个函数作为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • @task – 表示一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点中异步执行。任务返回一个类似 future 的对象,可以等待或同步解析。

这为构建具有状态管理和流式处理的工作流提供了最小的抽象。

Tip

对于更倾向于声明式方法的用户,LangGraph 的 图 API 允许您使用图范式来定义工作流。两种 API 共享相同的底层运行时,因此可以在同一应用程序中一起使用它们。 请参阅 功能 API 与图 API 部分,比较这两种范式的差异。

示例

下面我们将演示一个简单的应用程序,该程序撰写一篇论文并中断以请求人工审核。

API Reference: MemorySaver | entrypoint | task | interrupt

from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt


@task
def write_essay(topic: str) -> str:
    """根据给定主题撰写一篇论文。"""
    time.sleep(1) # 用于表示一个长时间运行任务的占位符。
    return f"关于主题的论文:{topic}"

@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
    """一个简单的流程,用于撰写论文并请求审核。"""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        # 作为参数传递给interrupt的任何可序列化为JSON的有效载荷。
        # 当从工作流中流式传输数据时,它将在客户端显示为Interrupt。
        "essay": essay, # 我们想要审核的论文。
        # 我们可以添加任何额外的信息。
        # 例如,引入一个名为"action"的键,并附带一些说明。
        "action": "请批准或拒绝这篇论文",
    })

    return {
        "essay": essay, # 生成的论文
        "is_approved": is_approved, # 来自人工审核的响应
    }
详细解释

此工作流将撰写一个关于“猫”的主题的论文,然后暂停以获取人工审核。工作流可以无限期地暂停,直到提供审核结果。

当工作流恢复时,它会从头开始执行,但由于write_essay任务的结果已经保存,因此结果将从检查点加载,而不是重新计算。

import time
import uuid

from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver

@task
def write_essay(topic: str) -> str:
    """根据给定主题撰写一篇论文。"""
    time.sleep(1) # 这是一个长时间运行任务的占位符。
    return f"关于主题的论文:{topic}"

@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
    """一个简单的流程,用于撰写论文并请求审核。"""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        # 作为参数传递给interrupt的任何可序列化为JSON的有效载荷。
        # 当从工作流中流式传输数据时,它将在客户端显示为Interrupt。
        "essay": essay, # 我们想要审核的论文。
        # 我们可以添加任何额外的信息。
        # 例如,引入一个名为"action"的键,并附带一些说明。
        "action": "请批准或拒绝这篇论文",
    })

    return {
        "essay": essay, # 生成的论文
        "is_approved": is_approved, # 来自人工审核的响应
    }

thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        "thread_id": thread_id
    }
}

for item in workflow.stream("cat", config):
    print(item)
{'write_essay': '关于主题的论文:cat'}
{'__interrupt__': (Interrupt(value={'essay': '关于主题的论文:cat', 'action': '请批准或拒绝这篇论文'}, resumable=True, ns=['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'], when='during'),)}

已经撰写了一篇论文,准备进行审核。一旦提供了审核,我们可以恢复工作流:

from langgraph.types import Command

# 从用户处获取审核(例如通过UI)
# 在这种情况下,我们使用布尔值,但可以是任何可序列化为JSON的值。
human_review = True

for item in workflow.stream(Command(resume=human_review), config):
    print(item)
{'workflow': {'essay': '关于主题的论文:cat', 'is_approved': False}}

工作流已完成,并且审核已添加到论文中。

入口点

@entrypoint 装饰器可用于从函数创建工作流。它封装了工作流逻辑并管理执行流程,包括处理 长时间运行的任务中断

定义

**入口点**通过使用 @entrypoint 装饰器装饰一个函数来定义。

该函数 必须接受一个位置参数,作为工作流输入。如果您需要传递多个数据项,请使用字典作为第一个参数的输入类型。

使用 entrypoint 装饰函数会生成一个 Pregel 实例,这有助于管理工作流的执行(例如,处理流式传输、恢复和检查点)。

通常您希望向 @entrypoint 装饰器传递一个 checkpointer 以启用持久化并使用诸如 human-in-the-loop 的功能。

from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: dict) -> int:
    # 一些可能涉及 API 调用等长时间运行任务的逻辑,
    # 并且可能因 human-in-the-loop 而被中断。
    ...
    return result
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
async def my_workflow(some_input: dict) -> int:
    # 一些可能涉及 API 调用等长时间运行任务的逻辑,
    # 并且可能因 human-in-the-loop 而被中断。
    ...
    return result 

序列化

入口点**的 **输入输出 必须是 JSON 可序列化的,以支持检查点。有关详细信息,请参阅 序列化 部分。

可注入参数

在声明 entrypoint 时,您可以请求访问在运行时自动注入的附加参数。这些参数包括:

参数 描述
previous 访问给定线程的前一个 checkpoint 相关的状态。请参见 短期记忆
store BaseStore 的实例。适用于 长期记忆
writer 在使用 Async Python < 3.11 时,用于访问 StreamWriter。详情请参见 使用函数式 API 进行流式传输
config 用于访问运行时配置。更多信息请参见 RunnableConfig

Important

使用适当的名称和类型注解声明参数。

请求可注入参数
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore

in_memory_store = InMemoryStore(...)  # 用于长期记忆的 InMemoryStore 实例

@entrypoint(
    checkpointer=checkpointer,  # 指定 checkpointer
    store=in_memory_store  # 指定 store
)  
def my_workflow(
    some_input: dict,  # 输入(例如,通过 `invoke` 传递)
    *,
    previous: Any = None, # 用于短期记忆
    store: BaseStore,  # 用于长期记忆
    writer: StreamWriter,  # 用于流式传输自定义数据
    config: RunnableConfig  # 用于访问传递给入口点的配置
) -> ...:

执行

使用 @entrypoint 会产生一个 Pregel 对象,可以使用 invokeainvokestreamastream 方法进行执行。

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}
my_workflow.invoke(some_input, config)  # 同步等待结果
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}
await my_workflow.ainvoke(some_input, config)  # 异步等待结果
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

for chunk in my_workflow.stream(some_input, config):
    print(chunk)
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

async for chunk in my_workflow.astream(some_input, config):
    print(chunk)

恢复

可以在 [interrupt][langgraph.types.interrupt] 之后通过将 resume 值传递给 [Command][langgraph.types.Command] 原语来恢复执行。

from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(Command(resume=some_resume_value), config)
from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

await my_workflow.ainvoke(Command(resume=some_resume_value), config)
from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

for chunk in my_workflow.stream(Command(resume=some_resume_value), config):
    print(chunk)
from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

async for chunk in my_workflow.astream(Command(resume=some_resume_value), config):
    print(chunk)

错误后的恢复

要恢复错误后,使用相同的 thread id (config) 并传入 None 来运行 entrypoint

这假设底层 错误 已经解决,并且执行可以成功继续。

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(None, config)
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

await my_workflow.ainvoke(None, config)
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

for chunk in my_workflow.stream(None, config):
    print(chunk)
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

async for chunk in my_workflow.astream(None, config):
    print(chunk)

短期记忆

当使用 checkpointer 定义 entrypoint 时,它会在相同 thread id 的连续调用之间存储信息到 检查点 中。

这允许使用 previous 参数访问前一次调用的状态。

默认情况下,previous 参数是前一次调用的返回值。

@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
    previous = previous or 0
    return number + previous

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(1, config)  # 1 (previous 为 None)
my_workflow.invoke(2, config)  # 3 (previous 是前一次调用的 1)

entrypoint.final

entrypoint.final 是可以从入口点返回的特殊原语,允许 解耦 存储在 检查点 中的值与 入口点的返回值

第一个值是入口点的返回值,第二个值是将保存在检查点中的值。类型注解是 entrypoint.final[return_type, save_type]

@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
    previous = previous or 0
    # 这将返回 previous 值给调用者,同时将
    # 2 * number 保存到检查点中,将在下一次调用中用于 `previous` 参数。
    return entrypoint.final(value=previous, save=2 * number)

config = {
    "configurable": {
        "thread_id": "1"
    }
}

my_workflow.invoke(3, config)  # 0 (previous 为 None)
my_workflow.invoke(1, config)  # 6 (previous 是前一次调用的 3 * 2)

任务

**任务**表示一个离散的工作单元,例如API调用或数据处理步骤。它有两个关键特性:

  • 异步执行:任务设计为异步执行,允许多个操作并发运行而不会阻塞。
  • 检查点:任务结果保存到检查点中,从而可以从中止的最后保存状态恢复工作流。(有关更多细节,请参见persistence)。

定义

使用@task装饰器定义任务,该装饰器包装一个常规的Python函数。

API Reference: task

from langgraph.func import task

@task()
def slow_computation(input_value):
    # 模拟一个长时间运行的操作
    ...
    return result

序列化

**任务的输出**必须是JSON可序列化的,以支持检查点。

执行

任务**只能从**入口点、另一个**任务**或state graph节点中调用。

**任务**不能直接从主应用程序代码中调用。

当你调用一个**任务**时,它会立即返回一个未来对象。未来是一个用于结果的占位符,该结果将在稍后可用。

要获取**任务**的结果,你可以同步等待(使用result())或者异步等待(使用await)。

@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: int) -> int:
    future = slow_computation(some_input)
    return future.result()  # 同步等待结果
@entrypoint(checkpointer=checkpointer)
async def my_workflow(some_input: int) -> int:
    return await slow_computation(some_input)  # 异步等待结果

何时使用任务

**任务**在以下场景中非常有用:

  • 检查点(Checkpointing):当你需要将长时间运行操作的结果保存到检查点中,以便在恢复工作流时无需重新计算。
  • 人机协作(Human-in-the-loop):如果你正在构建一个需要人工干预的工作流,你**必须**使用**任务**来封装任何随机性(例如 API 调用),以确保工作流可以正确恢复。有关更多细节,请参阅 确定性(determinism) 部分。
  • 并行执行(Parallel Execution):对于 I/O 密集型任务,**任务**支持并行执行,允许多个操作并发运行而不会阻塞(例如调用多个 API)。
  • 可观测性(Observability):将操作封装在**任务**中,可以通过 LangSmith 跟踪工作流的进度并监控单个操作的执行情况。
  • 可重试的工作(Retryable Work):当需要重试工作以处理失败或不一致时,**任务**提供了一种封装和管理重试逻辑的方式。

序列化

在 LangGraph 中,序列化有两个关键方面:

  1. @entrypoint 的输入和输出必须可以被 JSON 序列化。
  2. @task 的输出必须可以被 JSON 序列化。

这些要求对于启用检查点和工作流恢复是必要的。使用 Python 原语类型,如字典、列表、字符串、数字和布尔值,以确保你的输入和输出可以被序列化。

序列化确保了工作流状态(例如任务结果和中间值)可以被可靠地保存和恢复。这对于实现人机交互、容错性和并行执行至关重要。

如果提供不可序列化的输入或输出,在配置了检查点器的工作流中将会导致运行时错误。

确定性

要使用诸如 human-in-the-loop 之类的特性,任何随机性都应封装在 任务 中。这保证了当执行被暂停(例如,用于 human-in-the-loop)并恢复时,它将遵循相同的 步骤序列,即使 任务 的结果是非确定性的。

LangGraph 通过在执行过程中持久化 任务子图 的结果来实现这种行为。一个设计良好的工作流确保恢复执行时遵循 相同的步骤序列,从而能够正确检索之前计算的结果,而无需重新执行它们。这对于长时间运行的 任务 或具有非确定性结果的 任务 来说尤其有用,因为它避免了重复之前的工作,并允许从本质上相同的状态继续执行。

尽管工作流的不同运行可能会产生不同的结果,但恢复一个 特定 的运行应该始终遵循相同的记录步骤序列。这使得 LangGraph 能够高效查找在图被中断之前已执行的 任务子图 的结果,并避免重新计算它们。

幂等性

幂等性确保多次执行相同操作会产生相同的结果。这有助于防止因步骤重新运行而导致的重复 API 调用和冗余处理。始终将 API 调用放在 tasks 函数中以进行检查点记录,并设计它们在重新执行时也具有幂等性。如果 task 开始但未能成功完成,可能会发生重新执行。然后,如果工作流被恢复,task 将再次运行。使用幂等性键或验证现有结果以避免重复。

功能 API 与 图 API

功能 API图 API(StateGraph) 提供了两种不同的范式,用于使用 LangGraph 创建应用程序。以下是一些关键区别:

  • 控制流:功能 API 不需要考虑图结构。您可以使用标准的 Python 构造来定义工作流。这通常会减少您需要编写的代码量。
  • 短期记忆图 API 需要声明一个 State,并且可能需要定义 reducers 来管理对图状态的更新。@entrypoint@tasks 不需要显式的状态管理,因为它们的状态作用域限定在函数内,并且不会在函数之间共享。
  • 检查点:两种 API 都生成并使用检查点。在 图 API 中,每次 superstep 之后都会生成一个新的检查点。而在 功能 API 中,当任务执行时,其结果将保存到与给定入口点关联的现有检查点中,而不是创建新的检查点。
  • 可视化:图 API 使得将工作流作为图进行可视化变得容易,这对于调试、理解工作流以及与其他人的分享非常有用。功能 API 不支持可视化,因为图是在运行时动态生成的。

常见错误

处理副作用

将副作用(例如写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会重复执行。

在这个示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时会再次执行。

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    # 当恢复工作流时,这段代码将再次执行。
    # 这可能不是你想要的结果。
    with open("output.txt", "w") as f:
        f.write("Side effect executed")
    value = interrupt("question")
    return value

在这个示例中,副作用被封装在任务中,从而保证了恢复时的一致性执行。

from langgraph.func import task

@task
def write_to_file():
    with open("output.txt", "w") as f:
        f.write("Side effect executed")

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    # 现在副作用已被封装在任务中。
    write_to_file().result()
    value = interrupt("question")
    return value

非确定性的控制流

那些每次可能返回不同结果的操作(如获取当前时间或随机数)应封装在任务中,以确保在恢复时能返回相同的结果。

  • 在任务中:获取随机数(5)→ 中断 → 恢复 → 返回(5)→ ...
  • 不在任务中:获取随机数(5)→ 中断 → 恢复 → 获取新的随机数(7)→ ...

当使用**人机协作**的工作流并包含多个中断调用时,这一点尤为重要。LangGraph 为每个任务/入口点保留了一个恢复值列表。当遇到中断时,它会与相应的恢复值匹配。 这种匹配是严格**基于索引的**,因此恢复值的顺序应该与中断的顺序一致。

如果在恢复时未保持执行顺序,一个 interrupt 调用可能会与错误的 resume 值匹配,导致结果错误。

请阅读关于 确定性 的部分以了解更多信息。

在这个示例中,工作流使用当前时间来决定执行哪个任务。这是非确定性的,因为工作流的结果取决于其执行的时间。

from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    t0 = inputs["t0"]
    t1 = time.time()

    delta_t = t1 - t0

    if delta_t > 1:
        result = slow_task(1).result()
        value = interrupt("question")
    else:
        result = slow_task(2).result()
        value = interrupt("question")

    return {
        "result": result,
        "value": value
    }

在这个示例中,工作流使用输入 t0 来决定执行哪个任务。这是确定性的,因为工作流的结果仅取决于输入。

import time

from langgraph.func import task

@task
def get_time() -> float:
    return time.time()

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    t0 = inputs["t0"]
    t1 = get_time().result()

    delta_t = t1 - t0

    if delta_t > 1:
        result = slow_task(1).result()
        value = interrupt("question")
    else:
        result = slow_task(2).result()
        value = interrupt("question")

    return {
        "result": result,
        "value": value
    }