Skip to content

如何添加断点

前提条件

本指南假设您熟悉以下概念:

人工介入循环(HIL)交互对于代理系统至关重要。断点是一种常见的HIL交互模式,允许图在特定步骤停止并寻求人类批准后再继续执行(例如,对于敏感操作)。

断点是基于LangGraph 检查点构建的,这些检查点在每个节点执行后保存图的状态。检查点保存在线程中,这些线程保留图的状态,并且可以在图执行完成后访问。这允许图在特定点暂停,等待人类批准,然后从最后一个检查点恢复执行。

approval.png

环境搭建

首先我们需要安装所需的包

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_anthropic

接下来,我们需要为Anthropic(我们将使用的大型语言模型)设置API密钥

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
ANTHROPIC_API_KEY:  ········

为LangGraph开发设置LangSmith

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的内容,请参阅这里

简单用法

让我们来看一下这个工具的基本用法。

下面,我们做了两件事情:

1) 我们通过interrupt_before指定断点

2) 我们设置了一个检查点来保存图的状态。

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
    input: str


def step_1(state):
    print("---Step 1---")
    pass


def step_2(state):
    print("---Step 2---")
    pass


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add
graph = builder.compile(checkpointer=memory, interrupt_before=["step_3"])

# View
display(Image(graph.get_graph().draw_mermaid_png()))

API Reference: StateGraph | START | END | MemorySaver

我们为检查点创建了一个线程ID

我们运行到步骤3,如interrupt_before中定义的。

在用户输入/批准后,我们通过调用图并传递None来恢复执行

# Input
initial_input = {"input": "hello world"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
    print(event)

try:
    user_approval = input("Do you want to go to Step 3? (yes/no): ")
except:
    user_approval = "yes"

if user_approval.lower() == "yes":
    # If approved, continue the graph execution
    for event in graph.stream(None, thread, stream_mode="values"):
        print(event)
else:
    print("Operation cancelled by user.")
{'input': 'hello world'}
---Step 1---
---Step 2---
``````output
Do you want to go to Step 3? (yes/no):  yes
``````output
---Step 3---

代理

在代理的上下文中,断点对于手动批准某些代理操作非常有用。

为了展示这一点,我们将构建一个相对简单的ReAct风格的代理,该代理执行工具调用。

我们将在action节点调用之前添加一个断点。

# Set up the tool
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
from langgraph.graph import MessagesState, START
from langgraph.prebuilt import ToolNode
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver


@tool
def search(query: str):
    """Call to surf the web."""
    # This is a placeholder for the actual implementation
    # Don't let the LLM know this though 😊
    return [
        "It's sunny in San Francisco, but you better look out if you're a Gemini 😈."
    ]


tools = [search]
tool_node = ToolNode(tools)

# Set up the model

model = ChatAnthropic(model="claude-3-5-sonnet-20240620")
model = model.bind_tools(tools)


# Define nodes and conditional edges


# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # If there is no function call, then we finish
    if not last_message.tool_calls:
        return "end"
    # Otherwise if there is, we continue
    else:
        return "continue"


# Define the function that calls the model
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}


# Define a new graph
workflow = StateGraph(MessagesState)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

# We now add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
    # Finally we pass in a mapping.
    # The keys are strings, and the values are other nodes.
    # END is a special node marking that the graph should finish.
    # What will happen is we will call `should_continue`, and then the output of that
    # will be matched against the keys in this mapping.
    # Based on which one it matches, that node will then be called.
    {
        # If `tools`, then we call the tool node.
        "continue": "action",
        # Otherwise we finish.
        "end": END,
    },
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")

# Set up memory
memory = MemorySaver()

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable

# We add in `interrupt_before=["action"]`
# This will add a breakpoint before the `action` node is called
app = workflow.compile(checkpointer=memory, interrupt_before=["action"])

display(Image(app.get_graph().draw_mermaid_png()))

API Reference: ChatAnthropic | tool | START | ToolNode | END | StateGraph | MemorySaver

与代理交互

我们现在可以与代理进行交互。

我们看到,在调用工具之前它会停止,因为 interrupt_before 被设置在了 action 节点之前。

from langchain_core.messages import HumanMessage

thread = {"configurable": {"thread_id": "3"}}
inputs = [HumanMessage(content="search for the weather in sf now")]
for event in app.stream({"messages": inputs}, thread, stream_mode="values"):
    event["messages"][-1].pretty_print()

API Reference: HumanMessage

================================ Human Message =================================

search for the weather in sf now
================================== Ai Message ==================================

[{'text': "Certainly! I'll search for the current weather in San Francisco for you. Let me use the search function to find this information.", 'type': 'text'}, {'text': None, 'type': 'tool_use', 'id': 'toolu_011ezBx5hKKjVJwqnECNPyyC', 'name': 'search', 'input': {'query': 'current weather in San Francisco'}}]
简历

我们现在可以不带任何输入参数再次调用代理以继续执行。

这将按照请求运行工具。

在输入中使用None来运行一个被中断的图,意味着要“继续执行,就好像中断没有发生一样”。

for event in app.stream(None, thread, stream_mode="values"):
    event["messages"][-1].pretty_print()
================================= Tool Message =================================
Name: search

["It's sunny in San Francisco, but you better look out if you're a Gemini \ud83d\ude08."]
================================== Ai Message ==================================

Based on the search results, I can provide you with information about the current weather in San Francisco:

The weather in San Francisco right now is sunny. 

It's worth noting that the search result includes a playful reference to astrology, suggesting that Geminis should "look out." However, this is likely just a humorous addition and not related to the actual weather conditions.

Is there anything else you'd like to know about the weather in San Francisco or any other location?

Comments