Skip to content

如何使用 NodeInterrupt 动态添加断点

Note

对于**人机协作**的工作流,请使用新的interrupt()函数。请参阅人机协作概念指南以获取有关使用interrupt的设计模式的更多信息。

前提条件

本指南假设您熟悉以下概念:

人机协作(HIL)交互对于代理系统至关重要。断点是一种常见的HIL交互模式,允许图在特定步骤停止并寻求人类批准后再继续(例如,对于敏感操作)。

在LangGraph中,您可以在节点执行前/后添加断点。但很多时候,根据某些条件从给定节点内部动态中断图可能是有用的。在这种情况下,包括中断的原因信息也可能很有帮助。

本指南将展示如何使用NodeInterrupt动态中断图——这是一个可以从节点内部抛出的特殊异常。让我们看看它的实际应用!

环境搭建

首先,让我们安装所需的包

%%capture --no-stderr
%pip install -U langgraph

使用LangSmith进行LangGraph开发

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

定义图

from typing_extensions import TypedDict
from IPython.display import Image, display

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt


class State(TypedDict):
    input: str


def step_1(state: State) -> State:
    print("---Step 1---")
    return state


def step_2(state: State) -> State:
    # Let's optionally raise a NodeInterrupt
    # if the length of the input is longer than 5 characters
    if len(state["input"]) > 5:
        raise NodeInterrupt(
            f"Received input that is longer than 5 characters: {state['input']}"
        )

    print("---Step 2---")
    return state


def step_3(state: State) -> State:
    print("---Step 3---")
    return state


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

API Reference: StateGraph | START | END | MemorySaver

使用动态中断运行图

首先,让我们使用一个长度小于等于5个字符的输入来运行图。这应该会安全地忽略我们定义的中断条件,并在图执行结束时返回原始输入。

initial_input = {"input": "hello"}
thread_config = {"configurable": {"thread_id": "1"}}

for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello'}
---Step 1---
{'input': 'hello'}
---Step 2---
{'input': 'hello'}
---Step 3---
{'input': 'hello'}
如果我们此时检查图形,可以看到没有剩余的任务需要运行,并且图形确实完成了执行。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
()
()
现在,让我们使用一个长度超过5个字符的输入来运行该图。这应该会触发我们通过在step_2节点内部抛出NodeInterrupt错误所定义的动态中断。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "2"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}
我们可以看到,图形在执行step_2时停止了。如果我们检查此时的图形状态,可以看到关于下一个将要执行的节点(step_2)的信息,以及哪个节点引发了中断(同样是step_2),还有关于中断的其他相关信息。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)
如果我们尝试从断点处恢复图形,由于我们的输入和图形状态没有发生变化,我们将再次中断。

# NOTE: to resume the graph from a dynamic interrupt we use the same syntax as with regular interrupts -- we pass None as the input
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)

更新图表状态

为了解决这个问题,我们可以采取几种措施。

首先,我们可以简单地在一个不同的线程上运行图形,并使用较短的输入,就像我们在开始时所做的那样。或者,如果我们希望从断点处恢复图形执行,可以更新状态,使其输入长度短于5个字符(这是我们的中断条件)。

# NOTE: this update will be applied as of the last successful node before the interrupt, i.e. `step_1`, right before the node with an interrupt
graph.update_state(config=thread_config, values={"input": "foo"})
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 2---
{'input': 'foo'}
---Step 3---
{'input': 'foo'}
()
{'input': 'foo'}
您也可以更新状态 作为节点 step_2(中断节点),这样会完全跳过该节点。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "3"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

# NOTE: this update will skip the node `step_2` altogether
graph.update_state(config=thread_config, values=None, as_node="step_2")
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 3---
{'input': 'hello world'}
()
{'input': 'hello world'}

Comments