人工介入循环¶
本指南使用了新的 interrupt
函数。
从 LangGraph 0.2.57 版本开始,推荐使用 interrupt
函数 来设置断点,因为它简化了**人工介入循环**模式。
如果你正在寻找本概念指南的上一版本,该版本依赖于静态断点和 NodeInterrupt
异常,可在此处获取。
人工介入循环(或“循环内人工参与”)工作流将人工输入集成到自动化流程中,允许在关键阶段进行决策、验证或纠正。这在**基于大语言模型(LLM)的应用程序**中特别有用,因为底层模型可能偶尔会产生不准确的结果。在合规性、决策制定或内容生成等对错误容忍度较低的场景中,人工参与通过对模型输出进行审查、纠正或覆盖来确保可靠性。
用例¶
基于大语言模型(LLM)的应用中,**人工介入**工作流的主要用例包括:
- 🛠️ 审核工具调用:在工具执行之前,人工可以审核、编辑或批准大语言模型请求的工具调用。
- ✅ 验证大语言模型输出:人工可以审核、编辑或批准大语言模型生成的内容。
- 💡 提供上下文:允许大语言模型明确请求人工输入以进行澄清或获取更多细节,或支持多轮对话。
interrupt
¶
LangGraph 中的 interrupt
函数 可通过在特定节点暂停图、向人类展示信息并使用人类输入恢复图的执行,从而实现人工介入的工作流。此函数对于审批、编辑或收集额外输入等任务非常有用。interrupt
函数 与 Command
对象结合使用,以使用人类提供的值恢复图的执行。
from langgraph.types import interrupt
def human_node(state: State):
value = interrupt(
# 任何可进行 JSON 序列化的值,用于展示给人类。
# 例如,一个问题、一段文本或状态中的一组键
{
"text_to_revise": state["some_text"]
}
)
# 使用人类的输入更新状态,或根据输入路由图。
return {
"some_text": value
}
graph = graph_builder.compile(
checkpointer=checkpointer # `interrupt` 正常工作所需
)
# 运行图直到中断
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(some_input, config=thread_config)
# 使用人类的输入恢复图的执行
graph.invoke(Command(resume=value_from_human), config=thread_config)
Warning
中断功能强大且使用方便。然而,尽管在开发者体验方面它们可能类似于 Python 的 input()
函数,但需要注意的是,它们不会自动从中断点恢复执行。相反,它们会重新运行使用了中断的整个节点。
因此,中断通常最好放在节点的开头或专门的节点中。请阅读从中断处恢复执行部分以获取更多详细信息。
完整代码
如果你想查看代码的实际运行情况,以下是在图中使用 interrupt
的完整示例。
from typing import TypedDict
import uuid
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
"""图的状态。"""
some_text: str
def human_node(state: State):
value = interrupt(
# 任何可进行 JSON 序列化的值,用于展示给人类。
# 例如,一个问题、一段文本或状态中的一组键
{
"text_to_revise": state["some_text"]
}
)
return {
# 使用人类的输入更新状态
"some_text": value
}
# 构建图
graph_builder = StateGraph(State)
# 将 human-node 添加到图中
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
# `interrupt` 正常工作需要一个检查点器。
checkpointer = MemorySaver()
graph = graph_builder.compile(
checkpointer=checkpointer
)
# 向图传递一个线程 ID 以运行它。
thread_config = {"configurable": {"thread_id": uuid.uuid4()}}
# 使用 stream() 直接展示 `__interrupt__` 信息。
for chunk in graph.stream({"some_text": "Original text"}, config=thread_config):
print(chunk)
# 使用 Command 恢复执行
for chunk in graph.stream(Command(resume="Edited text"), config=thread_config):
print(chunk)
要求¶
要在你的图中使用 interrupt
,你需要:
- 指定一个检查点保存器,以在每一步之后保存图的状态。
- 在适当的位置调用
interrupt()
。有关示例,请参阅设计模式部分。 - 使用线程 ID 运行图,直到遇到
interrupt
。 - 使用
invoke
/ainvoke
/stream
/astream
恢复执行(请参阅Command
原语)。
设计模式¶
在有人在环的工作流中,通常可以执行三种不同的**操作**:
- 批准或拒绝:在关键步骤(如 API 调用)之前暂停图,以审查并批准该操作。如果操作被拒绝,可以阻止图执行该步骤,并可能采取替代操作。这种模式通常涉及根据人的输入对图进行**路由**。
- 编辑图状态:暂停图以审查并编辑图状态。这对于纠正错误或用额外信息更新状态很有用。这种模式通常涉及用人类的输入**更新**状态。
- 获取输入:在图的特定步骤明确请求人类输入。这对于收集额外信息或上下文以指导代理的决策过程或支持**多轮对话**很有用。
下面我们展示可以使用这些**操作**实现的不同设计模式。
批准或拒绝¶
在关键步骤(如 API 调用)之前暂停图,以审查并批准该操作。如果操作被拒绝,可以阻止图执行该步骤,并可能采取替代操作。
from typing import Literal
from langgraph.types import interrupt, Command
def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
is_approved = interrupt(
{
"question": "Is this correct?",
# Surface the output that should be
# reviewed and approved by the human.
"llm_output": state["llm_output"]
}
)
if is_approved:
return Command(goto="some_node")
else:
return Command(goto="another_node")
# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)
# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with either an approval or rejection.
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)
有关更详细的示例,请参阅如何审查工具调用。
审查并编辑状态¶
from langgraph.types import interrupt
def human_editing(state: State):
...
result = interrupt(
# Interrupt information to surface to the client.
# Can be any JSON serializable value.
{
"task": "Review the output from the LLM and make any necessary edits.",
"llm_generated_summary": state["llm_generated_summary"]
}
)
# Update the state with the edited text
return {
"llm_generated_summary": result["edited_text"]
}
# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)
...
# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with the edited text.
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
Command(resume={"edited_text": "The edited text"}),
config=thread_config
)
有关更详细的示例,请参阅如何使用中断等待用户输入。
审查工具调用¶
def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
# This is the value we'll be providing via Command(resume=<human_review>)
human_review = interrupt(
{
"question": "Is this correct?",
# Surface tool calls for review
"tool_call": tool_call
}
)
review_action, review_data = human_review
# Approve the tool call and continue
if review_action == "continue":
return Command(goto="run_tool")
# Modify the tool call manually and then continue
elif review_action == "update":
...
updated_msg = get_updated_msg(review_data)
# Remember that to modify an existing message you will need
# to pass the message with a matching ID.
return Command(goto="run_tool", update={"messages": [updated_message]})
# Give natural language feedback, and then pass that back to the agent
elif review_action == "feedback":
...
feedback_msg = get_feedback_msg(review_data)
return Command(goto="call_llm", update={"messages": [feedback_msg]})
有关更详细的示例,请参阅如何审查工具调用。
多轮对话¶
**多轮对话**涉及代理和人类之间的多次来回交互,这可以使代理以对话的方式从人类那里收集额外信息。
这种设计模式在由多个代理组成的大语言模型应用程序中很有用。一个或多个代理可能需要与人类进行多轮对话,在对话的不同阶段,人类提供输入或反馈。为简单起见,下面的代理实现被说明为单个节点,但实际上它可能是由多个节点组成的更大图的一部分,并且可能包含条件边。
在这种模式下,每个代理都有自己的人类节点来收集用户输入。这可以通过为人类节点命名唯一的名称(例如,“代理 1 的人类节点”,“代理 2 的人类节点”)或使用包含人类节点和代理节点的子图来实现。
from langgraph.types import interrupt
def human_input(state: State):
human_message = interrupt("human_input")
return {
"messages": [
{
"role": "human",
"content": human_message
}
]
}
def agent(state: State):
# Agent logic
...
graph_builder.add_node("human_input", human_input)
graph_builder.add_edge("human_input", "agent")
graph = graph_builder.compile(checkpointer=checkpointer)
# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with the human's input.
graph.invoke(
Command(resume="hello!"),
config=thread_config
)
在这种模式下,单个人类节点用于为多个代理收集用户输入。活动代理由状态决定,因此在收集到人类输入后,图可以路由到正确的代理。
from langgraph.types import interrupt
def human_node(state: MessagesState) -> Command[Literal["agent_1", "agent_2", ...]]:
"""A node for collecting user input."""
user_input = interrupt(value="Ready for user input.")
# Determine the **active agent** from the state, so
# we can route to the correct agent after collecting input.
# For example, add a field to the state or use the last active agent.
# or fill in `name` attribute of AI messages generated by the agents.
active_agent = ...
return Command(
update={
"messages": [{
"role": "human",
"content": user_input,
}]
},
goto=active_agent,
)
有关更详细的示例,请参阅如何实现多轮对话。
验证人类输入¶
如果需要在图本身(而不是在客户端)验证人类提供的输入,可以通过在单个节点中使用多个中断调用来实现。
from langgraph.types import interrupt
def human_node(state: State):
"""Human node with validation."""
question = "What is your age?"
while True:
answer = interrupt(question)
# Validate answer, if the answer isn't valid ask for input again.
if not isinstance(answer, int) or answer < 0:
question = f"'{answer} is not a valid age. What is your age?"
answer = None
continue
else:
# If the answer is valid, we can proceed.
break
print(f"The human in the loop is {answer} years old.")
return {
"age": answer
}
Command
原语¶
使用 interrupt
函数时,图将在中断处暂停并等待用户输入。
可以使用 Command 原语恢复图的执行,该原语可以通过 invoke
、ainvoke
、stream
或 astream
方法传递。
Command
原语提供了几个选项,用于在恢复执行期间控制和修改图的状态:
-
向
interrupt
传递一个值:使用Command(resume=value)
向图提供数据,例如用户的响应。执行将从使用interrupt
的节点开始处恢复,不过,这次interrupt(...)
调用将返回Command(resume=value)
中传递的值,而不是暂停图的执行。 -
更新图的状态:使用
Command(update=update)
修改图的状态。请注意,恢复执行从使用interrupt
的节点开始处开始。执行将从使用interrupt
的节点开始处恢复,但使用更新后的状态。
通过利用 Command
,你可以恢复图的执行、处理用户输入并动态调整图的状态。
与 invoke
和 ainvoke
一起使用¶
当你使用 stream
或 astream
来运行图时,你会收到一个 Interrupt
事件,该事件会让你知道 interrupt
已被触发。
invoke
和 ainvoke
不会返回中断信息。要访问此信息,你必须在调用 invoke
或 ainvoke
后使用 get_state 方法来检索图的状态。
# 运行图直到中断
result = graph.invoke(inputs, thread_config)
# 获取图的状态以获取中断信息。
state = graph.get_state(thread_config)
# 打印状态值
print(state.values)
# 打印待处理任务
print(state.tasks)
# 使用用户输入恢复图。
graph.invoke(Command(resume={"age": "25"}), thread_config)
{'foo': 'bar'} # 状态值
(
PregelTask(
id='5d8ffc92-8011-0c9b-8b59-9d3545b7e553',
name='node_foo',
path=('__pregel_pull', 'node_foo'),
error=None,
interrupts=(Interrupt(value='value_in_interrupt', resumable=True, ns=['node_foo:5d8ffc92-8011-0c9b-8b59-9d3545b7e553'], when='during'),), state=None,
result=None
),
) # 待处理任务。中断信息
从中断处恢复执行是如何工作的?¶
警告
从 interrupt
处恢复执行与 Python 的 input()
函数 不同,在 input()
函数中,执行会从调用 input()
函数的确切位置继续。
使用 interrupt
的一个关键方面是理解恢复执行的工作原理。当你在 interrupt
之后恢复执行时,图执行会从触发最后一个 interrupt
的 图节点 的 起始处 开始。
从节点起始处到 interrupt
之间的 所有 代码都将重新执行。
counter = 0
def node(state: State):
# 当图恢复执行时,从节点起始处到中断处的所有代码都将重新执行。
global counter
counter += 1
print(f"> 进入节点: {counter} 次")
# 暂停图并等待用户输入。
answer = interrupt()
print("计数器的值为:", counter)
...
在图 恢复执行 时,计数器将再次递增,从而产生以下输出:
常见陷阱¶
副作用¶
将具有副作用的代码(如 API 调用)放在 interrupt
之后,以避免重复执行,因为每次节点恢复时,这些代码都会重新触发。
当节点从 interrupt
恢复时,此代码将再次执行 API 调用。
如果 API 调用不是幂等的,或者成本较高,这可能会产生问题。
作为函数调用的子图¶
当以函数形式调用子图时,**父图**将从调用子图(并触发了 interrupt
)的**节点开头**恢复执行。同样,**子图**将从调用 interrupt()
函数的**节点开头**恢复执行。
例如,
def node_in_parent_graph(state: State):
some_code() # <-- 当子图恢复时,这部分代码将重新执行。
# 以函数形式调用子图。
# 子图中包含一个 `interrupt` 调用。
subgraph_result = subgraph.invoke(some_input)
...
示例:父图和子图的执行流程
假设我们有一个包含 3 个节点的父图:
父图:node_1
→ node_2
(子图调用) → node_3
子图有 3 个节点,其中第二个节点包含一个 interrupt
:
子图:sub_node_1
→ sub_node_2
(interrupt
) → sub_node_3
当恢复图的执行时,执行流程如下:
- 跳过父图中的
node_1
(已执行,图状态已保存在快照中)。 - 从开头重新执行父图中的
node_2
。 - 跳过子图中的
sub_node_1
(已执行,图状态已保存在快照中)。 - 从开头重新执行子图中的
sub_node_2
。 - 继续执行
sub_node_3
及后续节点。
以下是一个简化的示例代码,可帮助你理解子图与中断的工作原理。 它会统计每个节点被进入的次数并打印该计数。
import uuid
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
"""图状态。"""
state_counter: int
counter_node_in_subgraph = 0
def node_in_subgraph(state: State):
"""子图中的一个节点。"""
global counter_node_in_subgraph
counter_node_in_subgraph += 1 # 这部分代码**不会**再次运行!
print(f"总共进入 `node_in_subgraph` {counter_node_in_subgraph} 次")
counter_human_node = 0
def human_node(state: State):
global counter_human_node
counter_human_node += 1 # 这部分代码会再次运行!
print(f"总共进入子图中的 human_node {counter_human_node} 次")
answer = interrupt("你叫什么名字?")
print(f"得到的答案是 {answer}")
checkpointer = MemorySaver()
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)
counter_parent_node = 0
def parent_node(state: State):
"""此父节点将调用子图。"""
global counter_parent_node
counter_parent_node += 1 # 恢复时,这部分代码会再次运行!
print(f"总共进入 `parent_node` {counter_parent_node} 次")
# 请注意,我们有意在图状态中增加状态计数器
# 以证明子图对同一键的更新不会与父图冲突(直到
subgraph_state = subgraph.invoke(state)
return subgraph_state
builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")
# 必须启用检查点才能使中断正常工作!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"state_counter": 1}, config):
print(chunk)
print('--- 恢复执行 ---')
for chunk in graph.stream(Command(resume="35"), config):
print(chunk)
输出结果如下
总共进入 `parent_node` 1 次
总共进入 `node_in_subgraph` 1 次
总共进入子图中的 human_node 1 次
{'__interrupt__': (Interrupt(value='你叫什么名字?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
--- 恢复执行 ---
总共进入 `parent_node` 2 次
总共进入子图中的 human_node 2 次
得到的答案是 35
{'parent_node': {'state_counter': 1}}
使用多个中断¶
在**单个**节点中使用多个中断对于验证人工输入等模式可能会有帮助。但是,如果处理不当,在同一节点中使用多个中断可能会导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会为执行该节点的任务维护一个恢复值列表。每当执行恢复时,都会从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是**严格基于索引的**,因此节点内中断调用的顺序至关重要。
为避免出现问题,请勿在执行过程中动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为此类更改可能会导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION)
改变状态,或依赖全局变量动态修改节点结构。
错误代码示例
import uuid
from typing import TypedDict, Optional
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
"""图状态。"""
age: Optional[str]
name: Optional[str]
def human_node(state: State):
if not state.get('name'):
name = interrupt("你叫什么名字?")
else:
name = "N/A"
if not state.get('age'):
age = interrupt("你多大了?")
else:
age = "N/A"
print(f"姓名: {name}。年龄: {age}")
return {
"age": age,
"name": name,
}
builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")
# 必须启用检查点才能使中断正常工作!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"age": None, "name": None}, config):
print(chunk)
for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
print(chunk)
额外资源 📚¶
- 概念指南:持久化:阅读持久化指南,了解更多关于重放的背景信息。
- 操作指南:人在回路:学习如何在 LangGraph 中实现人在回路的工作流。
- 如何实现多轮对话:学习如何在 LangGraph 中实现多轮对话。