Skip to content

如何使用 interrupt 等待用户输入

先决条件

本指南假设您熟悉以下概念:

人机交互 (HIL) 交互对于代理系统至关重要。等待用户输入是常见的 HIL 交互模式,允许代理向用户提出澄清问题,并在收到输入之前继续执行。

我们可以在 LangGraph 中使用interrupt() 函数来实现这一点。interrupt 允许我们暂停图的执行以收集用户输入,并在收集到输入后继续执行。

环境搭建

首先我们需要安装所需的包

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_anthropic

接下来,我们需要为Anthropic和/或OpenAI(我们将使用的大型语言模型)设置API密钥

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
ANTHROPIC_API_KEY:  ········

为LangGraph开发设置LangSmith

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅此处

简单用法

让我们通过一个基本的例子来探索使用人类反馈的方法。一种简单的方法是在图中创建一个节点,human_feedback,专门用于收集用户输入。这使我们能够在图中的特定点收集反馈。

步骤:

  1. human_feedback 节点中调用 interrupt()
  2. 设置一个检查点器,以保存到此节点为止的图状态。
  3. 使用 Command(resume=...)human_feedback 节点提供所需的值,并恢复执行。
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
    input: str
    user_feedback: str


def step_1(state):
    print("---Step 1---")
    pass


def human_feedback(state):
    print("---human_feedback---")
    feedback = interrupt("Please provide feedback:")
    return {"user_feedback": feedback}


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

API Reference: StateGraph | START | END | Command | interrupt | MemorySaver

运行到human_feedback处的断点:

# Input
initial_input = {"input": "hello world"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")
---Step 1---
{'step_1': None}


---human_feedback---
{'__interrupt__': (Interrupt(value='Please provide feedback:', resumable=True, ns=['human_feedback:e9a51d27-22ed-8c01-3f17-0ed33209b554'], when='during'),)}
现在,我们可以手动更新图形状态以反映用户输入:

# Continue the graph execution
for event in graph.stream(
    Command(resume="go to step 3!"), thread, stream_mode="updates"
):
    print(event)
    print("\n")
---human_feedback---
{'human_feedback': {'user_feedback': 'go to step 3!'}}


---Step 3---
{'step_3': None}
我们可以看到反馈已被添加到状态中 -

graph.get_state(thread).values
{'input': 'hello world', 'user_feedback': 'go to step 3!'}

代理

代理的上下文中,等待用户反馈特别有用,可以用来提出澄清问题。为了说明这一点,我们将创建一个简单的ReAct风格的代理,该代理能够进行工具调用

在这个例子中,我们将使用Anthropic的聊天模型以及一个**模拟工具**(仅用于演示目的)。

使用Pydantic与LangChain

本笔记本使用Pydantic v2 BaseModel,需要langchain-core >= 0.3。使用langchain-core < 0.3将会由于混用了Pydantic v1和v2的BaseModel而导致错误。

# Set up the state
from langgraph.graph import MessagesState, START

# Set up the tool
# We will have one real tool - a search tool
# We'll also have one "fake" tool - a "ask_human" tool
# Here we define any ACTUAL tools
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode


@tool
def search(query: str):
    """Call to surf the web."""
    # This is a placeholder for the actual implementation
    # Don't let the LLM know this though 😊
    return f"I looked up: {query}. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈."


tools = [search]
tool_node = ToolNode(tools)

# Set up the model
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

from pydantic import BaseModel


# We are going "bind" all tools to the model
# We have the ACTUAL tools from above, but we also need a mock tool to ask a human
# Since `bind_tools` takes in tools but also just tool definitions,
# We can define a tool definition for `ask_human`
class AskHuman(BaseModel):
    """Ask the human a question"""

    question: str


model = model.bind_tools(tools + [AskHuman])

# Define nodes and conditional edges


# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # If there is no function call, then we finish
    if not last_message.tool_calls:
        return END
    # If tool call is asking Human, we return that node
    # You could also add logic here to let some system know that there's something that requires Human input
    # For example, send a slack message, etc
    elif last_message.tool_calls[0]["name"] == "AskHuman":
        return "ask_human"
    # Otherwise if there is, we continue
    else:
        return "action"


# Define the function that calls the model
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}


# We define a fake node to ask the human
def ask_human(state):
    tool_call_id = state["messages"][-1].tool_calls[0]["id"]
    location = interrupt("Please provide your location:")
    tool_message = [{"tool_call_id": tool_call_id, "type": "tool", "content": location}]
    return {"messages": tool_message}


# Build the graph

from langgraph.graph import END, StateGraph

# Define a new graph
workflow = StateGraph(MessagesState)

# Define the three nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

# We now add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")

# After we get back the human response, we go back to the agent
workflow.add_edge("ask_human", "agent")

# Set up memory
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
# We add a breakpoint BEFORE the `ask_human` node so it never executes
app = workflow.compile(checkpointer=memory)

display(Image(app.get_graph().draw_mermaid_png()))

API Reference: START | tool | ToolNode | ChatAnthropic | END | StateGraph | MemorySaver

与代理交互

我们现在可以与代理进行交互了。让我们让它询问用户当前所在的位置,然后告诉用户天气情况。

这应该会让它首先使用ask_human工具,然后再使用普通的工具。

config = {"configurable": {"thread_id": "2"}}
for event in app.stream(
    {
        "messages": [
            (
                "user",
                "Use the search tool to ask the user where they are, then look up the weather there",
            )
        ]
    },
    config,
    stream_mode="values",
):
    event["messages"][-1].pretty_print()
================================ Human Message =================================

Use the search tool to ask the user where they are, then look up the weather there
================================== Ai Message ==================================

[{'text': "I'll help you with that. Let me first ask the user about their location.", 'type': 'text'}, {'id': 'toolu_01KNvb7RCVu8yKYUuQQSKN1x', 'input': {'question': 'Where are you located?'}, 'name': 'AskHuman', 'type': 'tool_use'}]
Tool Calls:
  AskHuman (toolu_01KNvb7RCVu8yKYUuQQSKN1x)
 Call ID: toolu_01KNvb7RCVu8yKYUuQQSKN1x
  Args:
    question: Where are you located?

app.get_state(config).next
('ask_human',)

你可以看到我们的图在ask_human节点处中断,现在正在等待提供一个location。我们可以通过使用Command(resume="<location>")输入来提供这个值:

for event in app.stream(Command(resume="san francisco"), config, stream_mode="values"):
    event["messages"][-1].pretty_print()
================================== Ai Message ==================================

[{'text': "I'll help you with that. Let me first ask the user about their location.", 'type': 'text'}, {'id': 'toolu_01KNvb7RCVu8yKYUuQQSKN1x', 'input': {'question': 'Where are you located?'}, 'name': 'AskHuman', 'type': 'tool_use'}]
Tool Calls:
  AskHuman (toolu_01KNvb7RCVu8yKYUuQQSKN1x)
 Call ID: toolu_01KNvb7RCVu8yKYUuQQSKN1x
  Args:
    question: Where are you located?
================================= Tool Message =================================

san francisco
================================== Ai Message ==================================

[{'text': "Now I'll search for the weather in San Francisco.", 'type': 'text'}, {'id': 'toolu_01Y5C4rU9WcxBqFLYSMGjV1F', 'input': {'query': 'current weather in san francisco'}, 'name': 'search', 'type': 'tool_use'}]
Tool Calls:
  search (toolu_01Y5C4rU9WcxBqFLYSMGjV1F)
 Call ID: toolu_01Y5C4rU9WcxBqFLYSMGjV1F
  Args:
    query: current weather in san francisco
================================= Tool Message =================================
Name: search

I looked up: current weather in san francisco. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈.
================================== Ai Message ==================================

Based on the search results, it's currently sunny in San Francisco. Note that this is the current weather at the time of our conversation, and conditions can change throughout the day.

Comments