Skip to content

如何使用 NodeInterrupt 添加动态断点

Note

对于**人工介入**工作流,请使用新的 interrupt() 函数。有关使用 interrupt 的设计模式的更多信息,请查看 人工介入概念指南

先决条件

本指南假设你熟悉以下概念:

人工介入(HIL)交互对于智能体系统至关重要。断点是一种常见的 HIL 交互模式,它允许图在特定步骤停止,并在继续执行之前寻求人工批准(例如,对于敏感操作)。

在 LangGraph 中,你可以在节点执行之前/之后添加断点。但很多时候,根据某些条件从给定节点内部**动态**中断图的执行可能会很有帮助。这样做时,包含有关**为何**引发该中断的信息也可能会很有帮助。

本指南将展示如何使用 NodeInterrupt 动态中断图的执行 —— NodeInterrupt 是一种可以从节点内部抛出的特殊异常。让我们来看看它的实际应用吧!

环境设置

首先,让我们安装所需的软件包

%%capture --no-stderr
%pip install -U langgraph

为 LangGraph 开发设置 LangSmith

注册 LangSmith 以快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的大语言模型应用程序 — 点击 此处 了解更多关于如何开始使用的信息。

定义图

from typing_extensions import TypedDict
from IPython.display import Image, display

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt


class State(TypedDict):
    input: str


def step_1(state: State) -> State:
    print("---Step 1---")
    return state


def step_2(state: State) -> State:
    # Let's optionally raise a NodeInterrupt
    # if the length of the input is longer than 5 characters
    if len(state["input"]) > 5:
        raise NodeInterrupt(
            f"Received input that is longer than 5 characters: {state['input']}"
        )

    print("---Step 2---")
    return state


def step_3(state: State) -> State:
    print("---Step 3---")
    return state


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

运行带有动态中断的图

首先,让我们使用长度小于等于 5 个字符的输入来运行该图。这应该可以安全地忽略我们定义的中断条件,并在图执行结束时返回原始输入。

initial_input = {"input": "hello"}
thread_config = {"configurable": {"thread_id": "1"}}

for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello'}
---Step 1---
{'input': 'hello'}
---Step 2---
{'input': 'hello'}
---Step 3---
{'input': 'hello'}
如果我们此时检查该图,就会发现没有更多任务需要运行,并且该图实际上已经完成执行。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
()
()
现在,让我们使用一个长度超过 5 个字符的输入来运行图。这应该会触发我们通过在 step_2 节点内抛出一个 NodeInterrupt 错误所定义的动态中断。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "2"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}
我们可以看到,图在执行 step_2 时停止了。如果我们此时检查图的状态,就可以看到关于下一个要执行的节点(step_2)的信息,以及引发中断的节点(同样是 step_2)的信息,还有关于该中断的额外信息。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)
如果我们尝试从断点处恢复图的执行,由于输入和图的状态没有改变,我们将再次被中断。

# NOTE: to resume the graph from a dynamic interrupt we use the same syntax as with regular interrupts -- we pass None as the input
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)

更新图表状态

为了解决这个问题,我们可以采取以下几种方法。

首先,我们可以像一开始那样,在不同的线程上使用较短的输入来运行图。或者,如果我们想从断点处恢复图的执行,我们可以更新状态,使输入的长度小于 5 个字符(这是我们中断的条件)。

# NOTE: this update will be applied as of the last successful node before the interrupt, i.e. `step_1`, right before the node with an interrupt
graph.update_state(config=thread_config, values={"input": "foo"})
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 2---
{'input': 'foo'}
---Step 3---
{'input': 'foo'}
()
{'input': 'foo'}
你还可以**以节点 step_2**(中断节点)的身份更新状态,这样将完全跳过该节点。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "3"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

# NOTE: this update will skip the node `step_2` altogether
graph.update_state(config=thread_config, values=None, as_node="step_2")
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 3---
{'input': 'hello world'}
()
{'input': 'hello world'}

Comments