Skip to content

如何等待用户输入(功能 API)

前提条件

本指南假设您熟悉以下内容:

Human-in-the-loop (HIL) 交互对于 agentic systems 至关重要。等待用户输入是一种常见的 HIL 交互模式,允许代理向用户提出澄清问题,并在收到输入后继续执行。

我们可以在 LangGraph 中使用 [interrupt()][langgraph.types.interrupt] 函数实现这一点。interrupt 允许我们在收集用户输入时停止图的执行,并在收集到输入后继续执行。

本指南将演示如何使用 LangGraph 的 Functional API 实现 human-in-the-loop 工作流程。具体来说,我们将展示:

  1. 一个简单的使用示例
  2. 如何与 ReAct 代理一起使用

设置

首先,让我们安装所需的包并设置我们的 API 密钥:

pip install -U langgraph langchain-openai
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

设置 LangSmith 以获得更好的调试体验

注册 LangSmith 可以快速发现并解决您的 LangGraph 项目中的问题,提高性能。LangSmith 允许您使用追踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 —— 有关如何入门的更多信息,请参阅 文档

简单使用

让我们通过一个简单的示例来演示。我们将创建三个 任务

  1. 附加 "bar"
  2. 暂停以等待人工输入。恢复时,附加人工输入。
  3. 附加 "qux"

API Reference: entrypoint | task | Command | interrupt

from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """Append bar."""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """Append user input."""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """Append qux."""
    return f"{input_query} qux"

我们现在可以将这些任务组合到一个简单的入口点中:

API Reference: MemorySaver

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3

我们所做的一切,以启用人类在循环中的工作流程,就是在任务内调用 interrupt()

Tip

此前任务的结果——在此例中为 step_1——会被保存下来,因此在 interrupt 之后不会再次运行。

让我们发送一个查询字符串:

config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
{'step_1': 'foo bar'}


{'__interrupt__': (Interrupt(value='Please provide feedback: foo bar', resumable=True, ns=['graph:d66b2e35-0ee3-d8d6-1a22-aec9d58f13b9', 'human_feedback:e0cd4ee2-b874-e1d2-8bc4-3f7ddc06bcc2'], when='during'),)}

请注意,我们在 step_1 之后使用了 interrupt 暂停。该中断提供了恢复运行的指令。要恢复运行,我们发出一个包含 human_feedback 任务所期望数据的 Command

# Continue execution
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
{'human_feedback': 'foo bar baz'}


{'step_3': 'foo bar baz qux'}


{'graph': 'foo bar baz qux'}

恢复后,运行将继续执行剩余的步骤并按预期终止。

Agent

我们将基于如何使用功能 API 创建 ReAct 代理指南中创建的代理进行构建。

在此,我们将通过允许代理在需要时向人类寻求帮助来扩展其功能。

定义模型和工具

首先,让我们定义用于示例的工具和模型。与ReAct 代理指南一样,我们将使用一个占位符工具,该工具可以获取某个位置的天气描述。

对于这个示例,我们将使用 OpenAI 聊天模型,但任何支持工具调用的模型 支持工具调用 都可以满足需求。

API Reference: ChatOpenAI | tool

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

model = ChatOpenAI(model="gpt-4o-mini")


@tool
def get_weather(location: str):
    """Call to get the weather from a specific location."""
    # This is a placeholder for the actual implementation
    if any([city in location.lower() for city in ["sf", "san francisco"]]):
        return "It's sunny!"
    elif "boston" in location.lower():
        return "It's rainy!"
    else:
        return f"I am not sure what the weather is in {location}"

要寻求人工帮助,我们只需添加一个调用interrupt的工具:

API Reference: Command | interrupt

from langgraph.types import Command, interrupt


@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt({"query": query})
    return human_response["data"]


tools = [get_weather, human_assistance]

定义任务

我们的任务与ReAct 代理指南中的其他任务没有变化:

  1. 调用模型:我们希望使用消息列表来查询我们的聊天模型。
  2. 调用工具:如果模型生成了工具调用,我们希望执行这些调用。

我们只是为模型多提供了一个可用的工具。

API Reference: ToolMessage | entrypoint | task

from langchain_core.messages import ToolMessage
from langgraph.func import entrypoint, task

tools_by_name = {tool.name: tool for tool in tools}


@task
def call_model(messages):
    """Call model with a sequence of messages."""
    response = model.bind_tools(tools).invoke(messages)
    return response


@task
def call_tool(tool_call):
    tool = tools_by_name[tool_call["name"]]
    observation = tool.invoke(tool_call)
    return ToolMessage(content=observation, tool_call_id=tool_call["id"])

定义入口点

我们的 entrypointReAct 代理指南 中的也保持不变:

API Reference: MemorySaver | add_messages

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.message import add_messages

checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    llm_response = call_model(messages).result()
    while True:
        if not llm_response.tool_calls:
            break

        # Execute tools
        tool_result_futures = [
            call_tool(tool_call) for tool_call in llm_response.tool_calls
        ]
        tool_results = [fut.result() for fut in tool_result_futures]

        # Append to message list
        messages = add_messages(messages, [llm_response, *tool_results])

        # Call model again
        llm_response = call_model(messages).result()

    # Generate final response
    messages = add_messages(messages, llm_response)
    return entrypoint.final(value=llm_response, save=messages)

用法

让我们使用一个需要人类协助的问题来调用我们的模型。我们的问题还将需要调用 get_weather 工具:

def _print_step(step: dict) -> None:
    for task_name, result in step.items():
        if task_name == "agent":
            continue  # just stream from tasks
        print(f"\n{task_name}:")
        if task_name == "__interrupt__":
            print(result)
        else:
            result.pretty_print()
config = {"configurable": {"thread_id": "1"}}

user_message = {
    "role": "user",
    "content": (
        "Can you reach out for human assistance: what should I feed my cat? "
        "Separately, can you check the weather in San Francisco?"
    ),
}
print(user_message)

for step in agent.stream([user_message], config):
    _print_step(step)
{'role': 'user', 'content': 'Can you reach out for human assistance: what should I feed my cat? Separately, can you check the weather in San Francisco?'}

call_model:
================================== Ai Message ==================================
Tool Calls:
  human_assistance (call_joAEBVX7Abfm7TsZ0k95ZkVx)
 Call ID: call_joAEBVX7Abfm7TsZ0k95ZkVx
  Args:
    query: What should I feed my cat?
  get_weather (call_ut7zfHFCcms63BOZLrRHszGH)
 Call ID: call_ut7zfHFCcms63BOZLrRHszGH
  Args:
    location: San Francisco

call_tool:
================================= Tool Message =================================

content="It's sunny!" name='get_weather' tool_call_id='call_ut7zfHFCcms63BOZLrRHszGH'

__interrupt__:
(Interrupt(value={'query': 'What should I feed my cat?'}, resumable=True, ns=['agent:aa676ccc-b038-25e3-9c8a-18e81d4e1372', 'call_tool:059d53d2-3344-13bc-e170-48b632c2dd97'], when='during'),)

请注意,我们生成了两次工具调用,尽管我们的运行被中断了,但我们并没有阻止 get_weather 工具的执行。

让我们检查一下我们在哪里被中断了:

print(step)
{'__interrupt__': (Interrupt(value={'query': 'What should I feed my cat?'}, resumable=True, ns=['agent:aa676ccc-b038-25e3-9c8a-18e81d4e1372', 'call_tool:059d53d2-3344-13bc-e170-48b632c2dd97'], when='during'),)}

我们可以通过发出一个Command来恢复执行。请注意,我们在Command中提供的数据可以根据human_assistance的实现方式进行自定义。

human_response = "You should feed your cat a fish."
human_command = Command(resume={"data": human_response})

for step in agent.stream(human_command, config):
    _print_step(step)
call_tool:
================================= Tool Message =================================

content='You should feed your cat a fish.' name='human_assistance' tool_call_id='call_joAEBVX7Abfm7TsZ0k95ZkVx'

call_model:
================================== Ai Message ==================================

For human assistance, you should feed your cat fish. 

Regarding the weather in San Francisco, it's sunny!

如上所述,当我们恢复时,我们提供最终的工具消息,允许模型生成其响应。查看 LangSmith 的追踪,以了解运行的完整分解:

  1. 初始查询的追踪
  2. 恢复后的追踪