Skip to content

如何使用 interrupt 等待用户输入

前提条件

本指南假设你熟悉以下概念:

人工介入(HIL) 交互对于 自主系统 至关重要。等待用户输入是一种常见的 HIL 交互模式,它允许智能体向用户询问澄清问题,并在继续执行之前等待用户输入。

我们可以在 LangGraph 中使用 interrupt() 函数来实现这一点。interrupt 允许我们暂停图执行以收集用户输入,并使用收集到的输入继续执行。

安装设置

首先,我们需要安装所需的软件包。

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_anthropic

接下来,我们需要为 Anthropic 和/或 OpenAI(我们将使用的大语言模型)设置 API 密钥。

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
ANTHROPIC_API_KEY:  ········

为 LangGraph 开发设置 LangSmith

注册 LangSmith,以便快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的大语言模型应用程序 — 点击 此处 了解更多关于如何开始使用的信息。

简单用法

让我们来探讨一个使用人工反馈的基本示例。一种直接的方法是创建一个专门用于收集用户输入的节点 human_feedback。这使我们能够在图中特定的、选定的点收集反馈。

步骤:

  1. human_feedback 节点内部调用 interrupt()
  2. 设置一个 检查点器 来保存图到该节点为止的状态。
  3. 使用 Command(resume=...)human_feedback 节点提供所需的值并恢复执行。
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
    input: str
    user_feedback: str


def step_1(state):
    print("---Step 1---")
    pass


def human_feedback(state):
    print("---human_feedback---")
    feedback = interrupt("Please provide feedback:")
    return {"user_feedback": feedback}


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

运行直到我们在 human_feedback 处设置的断点:

# Input
initial_input = {"input": "hello world"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")
---Step 1---
{'step_1': None}


---human_feedback---
{'__interrupt__': (Interrupt(value='Please provide feedback:', resumable=True, ns=['human_feedback:e9a51d27-22ed-8c01-3f17-0ed33209b554'], when='during'),)}
现在,我们可以根据用户输入手动更新我们的图状态:

# Continue the graph execution
for event in graph.stream(
    Command(resume="go to step 3!"), thread, stream_mode="updates"
):
    print(event)
    print("\n")
---human_feedback---
{'human_feedback': {'user_feedback': 'go to step 3!'}}


---Step 3---
{'step_3': None}
我们可以看到我们的反馈已被添加到状态中 -

graph.get_state(thread).values
{'input': 'hello world', 'user_feedback': 'go to step 3!'}

智能体

智能体的语境中,等待用户反馈对于询问澄清性问题特别有用。为了说明这一点,我们将创建一个简单的ReAct 风格的智能体,它能够进行工具调用

在这个示例中,我们将使用 Anthropic 的聊天模型以及一个**模拟工具**(仅用于演示目的)。

在 LangChain 中使用 Pydantic

本笔记本使用 Pydantic v2 的 BaseModel,这需要 langchain-core >= 0.3。使用 langchain-core < 0.3 会因混用 Pydantic v1 和 v2 的 BaseModel 而导致错误。

# Set up the state
from langgraph.graph import MessagesState, START

# Set up the tool
# We will have one real tool - a search tool
# We'll also have one "fake" tool - a "ask_human" tool
# Here we define any ACTUAL tools
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode


@tool
def search(query: str):
    """Call to surf the web."""
    # This is a placeholder for the actual implementation
    # Don't let the LLM know this though 😊
    return f"I looked up: {query}. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈."


tools = [search]
tool_node = ToolNode(tools)

# Set up the model
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

from pydantic import BaseModel


# We are going "bind" all tools to the model
# We have the ACTUAL tools from above, but we also need a mock tool to ask a human
# Since `bind_tools` takes in tools but also just tool definitions,
# We can define a tool definition for `ask_human`
class AskHuman(BaseModel):
    """Ask the human a question"""

    question: str


model = model.bind_tools(tools + [AskHuman])

# Define nodes and conditional edges


# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # If there is no function call, then we finish
    if not last_message.tool_calls:
        return END
    # If tool call is asking Human, we return that node
    # You could also add logic here to let some system know that there's something that requires Human input
    # For example, send a slack message, etc
    elif last_message.tool_calls[0]["name"] == "AskHuman":
        return "ask_human"
    # Otherwise if there is, we continue
    else:
        return "action"


# Define the function that calls the model
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}


# We define a fake node to ask the human
def ask_human(state):
    tool_call_id = state["messages"][-1].tool_calls[0]["id"]
    ask = AskHuman.model_validate(state["messages"][-1].tool_calls[0]["args"])
    location = interrupt(ask.question)
    tool_message = [{"tool_call_id": tool_call_id, "type": "tool", "content": location}]
    return {"messages": tool_message}


# Build the graph

from langgraph.graph import END, StateGraph

# Define a new graph
workflow = StateGraph(MessagesState)

# Define the three nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

# We now add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")

# After we get back the human response, we go back to the agent
workflow.add_edge("ask_human", "agent")

# Set up memory
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
# We add a breakpoint BEFORE the `ask_human` node so it never executes
app = workflow.compile(checkpointer=memory)

display(Image(app.get_graph().draw_mermaid_png()))

API Reference: tool

与智能体交互

现在我们可以与智能体进行交互了。让我们要求它询问用户所在位置,然后告知他们当地的天气情况。

这应该会使它先使用 ask_human 工具,然后再使用常规工具。

config = {"configurable": {"thread_id": "2"}}
for event in app.stream(
    {
        "messages": [
            (
                "user",
                "Ask the user where they are, then look up the weather there",
            )
        ]
    },
    config,
    stream_mode="values",
):
    event["messages"][-1].pretty_print()
================================ Human Message =================================

Use the search tool to ask the user where they are, then look up the weather there
================================== Ai Message ==================================

[{'text': "I'll help you with that. Let me first ask the user about their location.", 'type': 'text'}, {'id': 'toolu_01KNvb7RCVu8yKYUuQQSKN1x', 'input': {'question': 'Where are you located?'}, 'name': 'AskHuman', 'type': 'tool_use'}]
Tool Calls:
  AskHuman (toolu_01KNvb7RCVu8yKYUuQQSKN1x)
 Call ID: toolu_01KNvb7RCVu8yKYUuQQSKN1x
  Args:
    question: Where are you located?

app.get_state(config).next
('ask_human',)

你可以看到,我们的图在 ask_human 节点处中断了,该节点目前正在等待提供一个 location。我们可以通过使用 Command(resume="<location>") 输入来调用图,从而提供这个值:

for event in app.stream(Command(resume="san francisco"), config, stream_mode="values"):
    event["messages"][-1].pretty_print()
================================== Ai Message ==================================

[{'text': "I'll help you with that. Let me first ask the user about their location.", 'type': 'text'}, {'id': 'toolu_01KNvb7RCVu8yKYUuQQSKN1x', 'input': {'question': 'Where are you located?'}, 'name': 'AskHuman', 'type': 'tool_use'}]
Tool Calls:
  AskHuman (toolu_01KNvb7RCVu8yKYUuQQSKN1x)
 Call ID: toolu_01KNvb7RCVu8yKYUuQQSKN1x
  Args:
    question: Where are you located?
================================= Tool Message =================================

san francisco
================================== Ai Message ==================================

[{'text': "Now I'll search for the weather in San Francisco.", 'type': 'text'}, {'id': 'toolu_01Y5C4rU9WcxBqFLYSMGjV1F', 'input': {'query': 'current weather in san francisco'}, 'name': 'search', 'type': 'tool_use'}]
Tool Calls:
  search (toolu_01Y5C4rU9WcxBqFLYSMGjV1F)
 Call ID: toolu_01Y5C4rU9WcxBqFLYSMGjV1F
  Args:
    query: current weather in san francisco
================================= Tool Message =================================
Name: search

I looked up: current weather in san francisco. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈.
================================== Ai Message ==================================

Based on the search results, it's currently sunny in San Francisco. Note that this is the current weather at the time of our conversation, and conditions can change throughout the day.

Comments