Skip to content

如何为并行节点执行创建分支

前提条件

本指南假设你熟悉以下内容:

节点的并行执行对于加快整个图操作的速度至关重要。LangGraph 原生支持节点的并行执行,这可以显著提升基于图的工作流的性能。这种并行化是通过扇出和扇入机制实现的,同时利用标准边和条件边。以下是一些示例,展示了如何创建适合你的分支数据流。

Screenshot 2024-07-09 at 2.55.56 PM.png

安装设置

首先,让我们安装所需的软件包。

%%capture --no-stderr
%pip install -U langgraph

为 LangGraph 开发设置 LangSmith

注册 LangSmith,以便快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你使用跟踪数据来调试、测试和监控使用 LangGraph 构建的大语言模型应用程序 — 点击 此处 了解更多关于如何开始使用的信息。

如何并行运行图节点

在这个示例中,我们从 节点 A 扇出到 节点 B 和节点 C,然后扇入到 节点 D。对于我们的状态,我们指定了归约器加法操作。这将合并或累积状态中特定键的值,而不是简单地覆盖现有值。对于列表而言,这意味着将新列表与现有列表连接起来。有关使用归约器更新状态的更多详细信息,请参阅本指南

import operator
from typing import Annotated, Any

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]


def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}


def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}


def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}


def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

使用归约器,你可以看到每个节点中添加的值都被累加起来了。

graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']

{'aggregate': ['A', 'B', 'C', 'D']}

Note

在上述示例中,节点 "b""c" 在同一个超级步中并发执行。由于它们处于同一步骤,节点 "d" 会在 "b""c" 都完成后执行。

重要的是,并行超级步的更新可能无法保证一致的顺序。如果您需要并行超级步的更新具有一致的、预先确定的顺序,则应将输出与用于排序的值一起写入状态的单独字段中。

异常处理?

LangGraph 在 "超级步(supersteps)" 内执行节点,这意味着虽然并行分支是并行执行的,但整个超级步是 事务性的。如果这些分支中的任何一个抛出异常,所有 更新都不会应用到状态中(整个超级步出错)。

重要的是,当使用 检查点器(checkpointer) 时,超级步内成功节点的结果会被保存,并且在恢复时不会重复执行。

如果你的代码容易出错(例如想要处理不稳定的 API 调用),LangGraph 提供了两种方法来解决这个问题:
  1. 你可以在节点内编写常规的 Python 代码来捕获和处理异常。
  2. 你可以设置一个 重试策略(retry_policy),以指示图对抛出特定类型异常的节点进行重试。只有失败的分支会被重试,因此你不必担心执行冗余的工作。

综合起来,这些功能让你可以进行并行执行并完全控制异常处理。

带有额外步骤的并行节点扇出和扇入

上述示例展示了在每条路径仅包含一个步骤时如何进行扇出和扇入。但如果某条路径包含多个步骤,情况会怎样呢?让我们在 “b” 分支中添加一个节点 b_2

def b_2(state: State):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge(["b_2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

graph.invoke({"aggregate": []})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']

{'aggregate': ['A', 'B', 'C', 'B_2', 'D']}

Note

在上述示例中,节点 "b""c" 在同一个超步中并发执行。下一步会发生什么呢?

我们在这里使用 add_edge(["b_2", "c"], "d") 来强制节点 "d" 仅在节点 "b_2""c" 都完成执行后才运行。如果我们添加两条单独的边, 节点 "d" 将运行两次:一次在节点 b2 完成后,另一次在节点 c 完成后(无论这些节点以何种顺序完成)。

条件分支

如果你的扇出不是确定性的,你可以直接使用 add_conditional_edges

import operator
from typing import Annotated, Sequence

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    # Add a key to the state. We will set this key to determine
    # how we branch.
    which: str


def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}


def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}


def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}


def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}


def e(state: State):
    print(f'Adding "E" to {state["aggregate"]}')
    return {"aggregate": ["E"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_node(e)
builder.add_edge(START, "a")


def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]


intermediates = ["b", "c", "d"]
builder.add_conditional_edges(
    "a",
    route_bc_or_cd,
    intermediates,
)
for node in intermediates:
    builder.add_edge(node, "e")

builder.add_edge("e", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

graph.invoke({"aggregate": [], "which": "bc"})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "E" to ['A', 'B', 'C']

{'aggregate': ['A', 'B', 'C', 'E'], 'which': 'bc'}

graph.invoke({"aggregate": [], "which": "cd"})
Adding "A" to []
Adding "C" to ['A']
Adding "D" to ['A']
Adding "E" to ['A', 'C', 'D']

{'aggregate': ['A', 'C', 'D', 'E'], 'which': 'cd'}

后续步骤

  • 继续阅读图 API 基础指南。
  • 了解如何创建映射 - 归约分支,在这些分支中,不同的状态可以分发到节点的多个实例。

Comments