添加人机协作环节¶
interrupt
¶
LangGraph 中的 [interrupt
函数][langgraph.types.interrupt] 通过在特定节点暂停图、向人类展示信息并使用其输入恢复图,实现了人机协作的工作流。它适用于审批、编辑或收集额外上下文等任务。
图通过一个提供人类响应的 [Command
][langgraph.types.Command] 对象来恢复执行。
API Reference: interrupt | Command
from langgraph.types import interrupt, Command
def human_node(state: State):
value = interrupt( # (1)!
{
"text_to_revise": state["some_text"] # (2)!
}
)
return {
"some_text": value # (3)!
}
graph = graph_builder.compile(checkpointer=checkpointer) # (4)!
# Run the graph until the interrupt is hit.
config = {"configurable": {"thread_id": "some_id"}}
result = graph.invoke({"some_text": "original text"}, config=config) # (5)!
print(result['__interrupt__']) # (6)!
# > [
# > Interrupt(
# > value={'text_to_revise': 'original text'},
# > resumable=True,
# > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# > )
# > ]
print(graph.invoke(Command(resume="Edited text"), config=config)) # (7)!
# > {'some_text': 'Edited text'}
interrupt(...)
在human_node
中暂停执行,并将给定的负载呈现给人类。- 可以传递任何可序列化的 JSON 值到
interrupt
函数中。这里是一个包含待修改文本的字典。 - 恢复后,
interrupt(...)
的返回值是人类提供的输入,用于更新状态。 - 需要检查点(checkpointer)来持久化图的状态。在生产环境中,这应该具有耐久性(例如由数据库支持)。
- 图使用初始状态进行调用。
- 当图遇到中断时,它会返回一个带有负载和元数据的
Interrupt
对象。 - 使用
Command(resume=...)
来恢复图,注入人类的输入并继续执行。
扩展示例:使用 interrupt
from typing import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
some_text: str
def human_node(state: State):
value = interrupt( # (1)!
{
"text_to_revise": state["some_text"] # (2)!
}
)
return {
"some_text": value # (3)!
}
# Build the graph
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
checkpointer = InMemorySaver() # (4)!
graph = graph_builder.compile(checkpointer=checkpointer)
# Pass a thread ID to the graph to run it.
config = {"configurable": {"thread_id": uuid.uuid4()}}
# Run the graph until the interrupt is hit.
result = graph.invoke({"some_text": "original text"}, config=config) # (5)!
print(result['__interrupt__']) # (6)!
# > [
# > Interrupt(
# > value={'text_to_revise': 'original text'},
# > resumable=True,
# > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# > )
# > ]
print(graph.invoke(Command(resume="Edited text"), config=config)) # (7)!
# > {'some_text': 'Edited text'}
interrupt(...)
在human_node
中暂停执行,并将给定的负载呈现给人类。- 可以传递任何可序列化的 JSON 值到
interrupt
函数中。这里是一个包含待修改文本的字典。 - 恢复后,
interrupt(...)
的返回值是人类提供的输入,用于更新状态。 - 需要检查点(checkpointer)来持久化图的状态。在生产环境中,这应该具有耐久性(例如由数据库支持)。
- 图使用初始状态进行调用。
- 当图遇到中断时,它会返回一个带有负载和元数据的
Interrupt
对象。 - 使用
Command(resume=...)
来恢复图,注入人类的输入并继续执行。
新增于 0.4.0 版本
__interrupt__
是一个特殊键,当图被中断时运行图时会返回该键。从 0.4.0 版本开始,invoke
和 ainvoke
支持了 __interrupt__
。如果你使用的是旧版本,你只有在使用 stream
或 astream
时才能在结果中看到 __interrupt__
。你也可以使用 graph.get_state(thread_id)
来获取中断值。
Warning
中断功能既强大又易于使用。然而,尽管它们在开发者体验上可能类似于 Python 的 input() 函数,但需要注意的是,它们不会自动从中断点恢复执行。相反,它们会重新运行使用中断的整个节点。 因此,通常最好将中断放置在节点的开头或专用节点中。有关详细信息,请阅读 如何从中断恢复 部分。
要求¶
要在你的图中使用 interrupt
,你需要:
- 指定一个检查点器 以在每一步之后保存图的状态。
- 在适当的位置调用
interrupt()
。有关示例,请参见设计模式部分。 - 使用线程 ID 运行图,直到遇到
interrupt
。 - 使用
invoke
/ainvoke
/stream
/astream
恢复执行(参见Command
原语)。
设计模式¶
通常,你可以在人机协作工作流中执行三种不同的 操作:
- 批准或拒绝:在关键步骤(如API调用)之前暂停图,以审查并批准该操作。如果操作被拒绝,你可以阻止图执行该步骤,并可能采取其他行动。这种模式通常涉及根据用户的输入对图进行 路由。
- 编辑图状态:暂停图以审查和编辑图状态。这在纠正错误或使用额外信息更新状态时非常有用。这种模式通常涉及使用用户的输入来 更新 状态。
- 获取输入:在图的某个特定步骤中明确请求用户输入。这对于收集额外信息或上下文以指导代理的决策过程非常有用。
下面我们将展示可以使用这些 操作 实现的不同设计模式。
批准或拒绝¶
在关键步骤(如API调用)之前暂停图,以审查并批准该操作。如果操作被拒绝,你可以阻止图执行该步骤,并可能采取其他行动。
API Reference: interrupt | Command
from typing import Literal
from langgraph.types import interrupt, Command
def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
is_approved = interrupt(
{
"question": "Is this correct?",
# 显示需要用户审查和批准的输出。
"llm_output": state["llm_output"]
}
)
if is_approved:
return Command(goto="some_node")
else:
return Command(goto="another_node")
# 在适当的位置将节点添加到图中
# 并连接到相关节点。
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)
# 在运行图并遇到中断后,图将暂停。
# 使用批准或拒绝来恢复它。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)
扩展示例:使用中断进行批准或拒绝
from typing import Literal, TypedDict
import uuid
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
# 定义共享的图状态
class State(TypedDict):
llm_output: str
decision: str
# 模拟LLM输出节点
def generate_llm_output(state: State) -> State:
return {"llm_output": "This is the generated output."}
# 人类批准节点
def human_approval(state: State) -> Command[Literal["approved_path", "rejected_path"]]:
decision = interrupt({
"question": "Do you approve the following output?",
"llm_output": state["llm_output"]
})
if decision == "approve":
return Command(goto="approved_path", update={"decision": "approved"})
else:
return Command(goto="rejected_path", update={"decision": "rejected"})
# 批准后的下一步
def approved_node(state: State) -> State:
print("✅ Approved path taken.")
return state
# 拒绝后的替代路径
def rejected_node(state: State) -> State:
print("❌ Rejected path taken.")
return state
# 构建图
builder = StateGraph(State)
builder.add_node("generate_llm_output", generate_llm_output)
builder.add_node("human_approval", human_approval)
builder.add_node("approved_path", approved_node)
builder.add_node("rejected_path", rejected_node)
builder.set_entry_point("generate_llm_output")
builder.add_edge("generate_llm_output", "human_approval")
builder.add_edge("approved_path", END)
builder.add_edge("rejected_path", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 运行直到中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"])
# 输出:
# Interrupt(value={'question': 'Do you approve the following output?', 'llm_output': 'This is the generated output.'}, ...)
# 模拟使用人类输入恢复
# 要测试拒绝,请将 resume="approve" 替换为 resume="reject"
final_result = graph.invoke(Command(resume="approve"), config=config)
print(final_result)
有关更详细的示例,请参阅 如何审查工具调用。
审查与编辑状态¶
API Reference: interrupt
from langgraph.types import interrupt
def human_editing(state: State):
...
result = interrupt(
# 中断信息提供给客户端。
# 可以是任何可序列化的JSON值。
{
"task": "审查LLM的输出并进行必要的编辑。",
"llm_generated_summary": state["llm_generated_summary"]
}
)
# 用编辑后的文本更新状态
return {
"llm_generated_summary": result["edited_text"]
}
# 在适当的位置将节点添加到图中
# 并连接到相关节点。
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)
...
# 在运行图并遇到中断后,图将暂停。
# 使用编辑后的文本恢复它。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
Command(resume={"edited_text": "The edited text"}),
config=thread_config
)
扩展示例:使用中断编辑状态
from typing import TypedDict
import uuid
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
# 定义图状态
class State(TypedDict):
summary: str
# 模拟LLM摘要生成
def generate_summary(state: State) -> State:
return {
"summary": "The cat sat on the mat and looked at the stars."
}
# 人类编辑节点
def human_review_edit(state: State) -> State:
result = interrupt({
"task": "请审查并编辑生成的摘要(如有必要)。",
"generated_summary": state["summary"]
})
return {
"summary": result["edited_summary"]
}
# 模拟下游使用编辑后的摘要
def downstream_use(state: State) -> State:
print(f"✅ Using edited summary: {state['summary']}")
return state
# 构建图
builder = StateGraph(State)
builder.add_node("generate_summary", generate_summary)
builder.add_node("human_review_edit", human_review_edit)
builder.add_node("downstream_use", downstream_use)
builder.set_entry_point("generate_summary")
builder.add_edge("generate_summary", "human_review_edit")
builder.add_edge("human_review_edit", "downstream_use")
builder.add_edge("downstream_use", END)
# 设置内存检查点以支持中断
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 调用图直到遇到中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
# 输出中断负载
print(result["__interrupt__"])
# 示例输出:
# Interrupt(
# value={
# 'task': 'Please review and edit the generated summary if necessary.',
# 'generated_summary': 'The cat sat on the mat and looked at the stars.'
# },
# resumable=True,
# ...
# )
# 使用人类编辑的输入恢复图
edited_summary = "The cat lay on the rug, gazing peacefully at the night sky."
resumed_result = graph.invoke(
Command(resume={"edited_summary": edited_summary}),
config=config
)
print(resumed_result)
审查工具调用¶
def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
# 我们将通过 Command(resume=<human_review>) 提供这个值
human_review = interrupt(
{
"question": "Is this correct?",
# 展示工具调用以供审查
"tool_call": tool_call
}
)
review_action, review_data = human_review
# 批准工具调用并继续
if review_action == "continue":
return Command(goto="run_tool")
# 手动修改工具调用然后继续
elif review_action == "update":
...
updated_msg = get_updated_msg(review_data)
# 记住,要修改现有消息,你需要传递一个具有匹配ID的消息。
return Command(goto="run_tool", update={"messages": [updated_message]})
# 提供自然语言反馈,然后将其返回给代理
elif review_action == "feedback":
...
feedback_msg = get_feedback_msg(review_data)
return Command(goto="call_llm", update={"messages": [feedback_msg]})
有关更详细的示例,请参阅 如何审查工具调用。
验证用户输入¶
如果你需要在图本身内验证用户提供的输入(而不是在客户端),可以通过在一个节点中使用多个中断调用来实现这一点。
API Reference: interrupt
from langgraph.types import interrupt
def human_node(state: State):
"""带有验证的人类节点。"""
question = "What is your age?"
while True:
answer = interrupt(question)
# 验证答案,如果答案无效,再次请求输入。
if not isinstance(answer, int) or answer < 0:
question = f"'{answer} is not a valid age. What is your age?"
answer = None
continue
else:
# 如果答案有效,我们可以继续。
break
print(f"The human in the loop is {answer} years old.")
return {
"age": answer
}
扩展示例:验证用户输入
from typing import TypedDict
import uuid
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
# 定义图状态
class State(TypedDict):
age: int
# 请求人类输入并验证它的节点
def get_valid_age(state: State) -> State:
prompt = "Please enter your age (must be a non-negative integer)."
while True:
user_input = interrupt(prompt)
# 验证输入
try:
age = int(user_input)
if age < 0:
raise ValueError("Age must be non-negative.")
break # 接收到有效输入
except (ValueError, TypeError):
prompt = f"'{user_input}' is not valid. Please enter a non-negative integer for age."
return {"age": age}
# 使用有效输入的节点
def report_age(state: State) -> State:
print(f"✅ Human is {state['age']} years old.")
return state
# 构建图
builder = StateGraph(State)
builder.add_node("get_valid_age", get_valid_age)
builder.add_node("report_age", report_age)
builder.set_entry_point("get_valid_age")
builder.add_edge("get_valid_age", "report_age")
builder.add_edge("report_age", END)
# 创建带有内存检查点的图
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 运行图直到第一个中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"]) # 第一个提示:“请输入你的年龄...”
# 模拟无效输入(例如,字符串而不是整数)
result = graph.invoke(Command(resume="not a number"), config=config)
print(result["__interrupt__"]) # 带有验证消息的后续提示
# 模拟第二次无效输入(例如,负数)
result = graph.invoke(Command(resume="-10"), config=config)
print(result["__interrupt__"]) # 再次尝试
# 提供有效输入
final_result = graph.invoke(Command(resume="25"), config=config)
print(final_result) # 应包含有效的年龄
使用 Command
原语恢复执行¶
当在图中使用 interrupt
函数时,执行会在该点暂停并等待用户输入。
要恢复执行,请使用 [Command
][langgraph.types.Command] 原语,可以通过 invoke
、ainvoke
、stream
或 astream
方法提供该原语。
对 interrupt
提供响应:
要继续执行,请使用 Command(resume=value)
传递用户的输入。图将从最初调用 interrupt(...)
的节点的开始处恢复执行。这一次,interrupt
函数将返回 Command(resume=value)
中提供的值,而不是再次暂停。
从中断中恢复是如何工作的?¶
Warning
从 interrupt
中恢复与 Python 的 input()
函数不同,执行会从 input()
函数被调用的确切位置继续。
使用 interrupt
的一个关键点是理解恢复的机制。当你在 interrupt
之后恢复执行时,图的执行将从**触发最后一个 interrupt
的图节点**的**开始**处重新开始。
**从节点的开头到 interrupt
的所有代码**都将被重新执行。
counter = 0
def node(state: State):
# 所有从节点开头到 interrupt 的代码将在图恢复时重新执行。
global counter
counter += 1
print(f"> Entered the node: {counter} 次")
# 暂停图并等待用户输入。
answer = interrupt()
print("counter 的值是:", counter)
...
当**恢复**图时,计数器将第二次递增,导致以下输出:
使用一次调用来恢复多个中断¶
如果你的任务队列中有多个中断,你可以使用 Command.resume
并提供一个将中断 ID 映射到恢复值的字典,从而通过一次 invoke
/ stream
调用来恢复多个中断。
例如,一旦你的图已经被中断(理论上多次)并且处于停滞状态:
resume_map = {
i.interrupt_id: f"human input for prompt {i.value}"
for i in parent.get_state(thread_config).interrupts
}
parent_graph.invoke(Command(resume=resume_map), config=thread_config)
常见错误¶
副作用¶
将具有副作用的代码(例如 API 调用)放在 interrupt
之后,以避免重复执行,因为每次节点恢复时都会重新触发这些操作。
当从 interrupt
恢复节点时,这段代码会再次执行 API 调用。
如果 API 调用不是幂等的或只是昂贵的,这可能会有问题。
作为函数调用子图¶
当以 函数 的形式调用子图时,父图 将从 调用子图的节点开始处 恢复执行(即 interrupt
被触发的地方)。同样地,子图 也会从 interrupt()
函数被调用的节点开始处 恢复执行。
例如,
def node_in_parent_graph(state: State):
some_code() # <-- 当子图恢复时,此代码将再次执行。
# 以函数的形式调用一个子图。
# 子图包含一个 `interrupt` 调用。
subgraph_result = subgraph.invoke(some_input)
...
扩展示例:父图和子图的执行流程
假设我们有一个包含三个节点的父图:
父图:node_1
→ node_2
(子图调用) → node_3
并且子图有三个节点,其中第二个节点包含一个 interrupt
:
子图:sub_node_1
→ sub_node_2
(interrupt
) → sub_node_3
在恢复图时,执行流程如下:
- 跳过父图中的
node_1
(已执行,图状态保存在快照中)。 - 从头开始重新执行父图中的
node_2
。 - 跳过子图中的
sub_node_1
(已执行,图状态保存在快照中)。 - 从头开始重新执行子图中的
sub_node_2
。 - 继续执行
sub_node_3
及后续节点。
下面是你可以用来理解中断如何与子图配合工作的简化示例代码。 它统计每个节点进入的次数并打印该次数。
import uuid
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
"""图状态。"""
state_counter: int
counter_node_in_subgraph = 0
def node_in_subgraph(state: State):
"""子图中的一个节点。"""
global counter_node_in_subgraph
counter_node_in_subgraph += 1 # 此代码将**不会**再次运行!
print(f"总共进入了 `node_in_subgraph` {counter_node_in_subgraph} 次")
counter_human_node = 0
def human_node(state: State):
global counter_human_node
counter_human_node += 1 # 此代码将在恢复时再次运行!
print(f"总共进入了子图中的人类节点 {counter_human_node} 次")
answer = interrupt("你的名字是什么?")
print(f"得到了答案 {answer}")
checkpointer = MemorySaver()
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)
counter_parent_node = 0
def parent_node(state: State):
"""这个父节点将调用子图。"""
global counter_parent_node
counter_parent_node += 1 # 在恢复时此代码将再次运行!
print(f"总共进入了 `parent_node` {counter_parent_node} 次")
# 请注意,我们故意在图状态中增加状态计数器
# 以演示子图对相同键的更新不会与父图冲突(直到
subgraph_state = subgraph.invoke(state)
return subgraph_state
builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")
# 必须启用检查点才能使中断正常工作!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"state_counter": 1}, config):
print(chunk)
print('--- 恢复 ---')
for chunk in graph.stream(Command(resume="35"), config):
print(chunk)
这将输出:
总共进入了 `parent_node` 1 次
总共进入了 `node_in_subgraph` 1 次
总共进入了子图中的人类节点 1 次
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
--- Resuming ---
总共进入了 `parent_node` 2 次
总共进入了子图中的人类节点 2 次
得到了答案 35
{'parent_node': {'state_counter': 1}}
使用多个中断¶
在 单个 节点中使用多个中断对于像 验证人类输入 这样的模式可能很有帮助。但是,如果处理不当,在同一个节点中使用多个中断可能导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会为正在执行该节点的任务保留一组特定的恢复值。每当执行恢复时,它会从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是**严格基于索引的**,因此节点内中断调用的顺序至关重要。
为了避免问题,请避免在不同执行之间动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常来自于非常规模式,如通过 Command(resume=..., update=SOME_STATE_MUTATION)
修改状态,或者依赖全局变量来动态修改节点的结构。
扩展示例:引入非确定性的不正确代码
import uuid
from typing import TypedDict, Optional
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
"""图状态。"""
age: Optional[str]
name: Optional[str]
def human_node(state: State):
if not state.get('name'):
name = interrupt("你的名字是什么?")
else:
name = "N/A"
if not state.get('age'):
age = interrupt("你的年龄是多少?")
else:
age = "N/A"
print(f"姓名:{name}。年龄:{age}")
return {
"age": age,
"name": name,
}
builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")
# 必须启用检查点才能使中断正常工作!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"age": None, "name": None}, config):
print(chunk)
for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
print(chunk)