如何使用图 API¶
本指南演示了 LangGraph 图 API 的基础知识。它将逐步介绍 状态 的定义和更新,以及如何组合常见的图结构,例如 序列、分支 和 循环。此外,它还涵盖了 LangGraph 的控制功能,包括用于映射-归约工作流的 Send API 以及用于将状态更新与节点间“跳转”结合的 Command API。
设置¶
安装 langgraph
:
设置 LangSmith 以获得更好的调试体验
注册 LangSmith 可以快速发现并解决您 LangGraph 项目中的问题,提升性能。LangSmith 允许您使用追踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 — 有关如何入门的更多信息,请参阅 文档。
定义和更新状态¶
在此我们展示如何在 LangGraph 中定义和更新 状态。我们将演示:
定义状态¶
LangGraph 中的 State 可以是 TypedDict
、Pydantic
模型或 dataclass。下面我们将使用 TypedDict
。有关使用 Pydantic 的详细信息,请参阅 此部分。
默认情况下,图将具有相同的输入和输出模式,而状态决定了该模式。有关如何定义不同的输入和输出模式,请参阅 此部分。
让我们考虑一个使用 messages 的简单示例。这代表了许多 LLM 应用程序中状态的一种灵活形式。有关更多细节,请参阅我们的 概念页面。
API Reference: AnyMessage
from langchain_core.messages import AnyMessage
from typing_extensions import TypedDict
class State(TypedDict):
messages: list[AnyMessage]
extra_field: int
此状态跟踪一个message对象的列表,以及一个额外的整数字段。
更新状态¶
让我们构建一个包含单个节点的示例图。我们的node只是一个Python函数,它读取图的状态并对它进行更新。该函数的第一个参数始终是状态:
API Reference: AIMessage
from langchain_core.messages import AIMessage
def node(state: State):
messages = state["messages"]
new_message = AIMessage("Hello!")
return {"messages": messages + [new_message], "extra_field": 10}
此节点只是将一条消息附加到我们的消息列表中,并填充一个额外的字段。
Important
节点应直接返回对状态的更新,而不是修改状态。
接下来,我们定义一个包含此节点的简单图。我们使用 StateGraph 来定义一个操作此状态的图。然后我们使用 add_node 来填充我们的图。
API Reference: StateGraph
from langgraph.graph import StateGraph
builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()
LangGraph 提供了内置的工具用于可视化你的图。让我们来查看我们的图。有关可视化的详细信息,请参阅此部分。
在这种情况下,我们的图仅执行单个节点。让我们进行一个简单的调用:
API Reference: HumanMessage
from langchain_core.messages import HumanMessage
result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi', additional_kwargs={}, response_metadata={}),
AIMessage(content='Hello!', additional_kwargs={}, response_metadata={})],
'extra_field': 10}
请注意:
- 我们通过更新状态中的一个键来触发调用。
- 我们在调用结果中接收到整个状态。
为了方便,我们经常通过漂亮的打印方式检查 message objects 的内容:
================================ Human Message =================================
Hi
================================== Ai Message ==================================
Hello!
使用 reducer 处理状态更新¶
状态中的每个键都可以拥有其独立的 reducer 函数,用于控制如何应用来自节点的更新。如果没有显式指定 reducer 函数,则默认所有对该键的更新都将覆盖它。
对于 TypedDict
类型的状态模式,我们可以通过将状态中相应字段标注为 reducer 函数来定义 reducer。
在前面的例子中,我们的节点通过向 "messages"
键追加一条消息来更新状态。下面,我们为这个键添加一个 reducer,使得更新会自动被追加:
from langroid.agent.impl.llm.ollama import OllamaLLM
from langroid.agent.task import Task
from langroid.agent.message import Message
from langroid.agent.state import State, typeddict_state
from langroid.agent.reducer import reducer
@typeddict_state
class MyState(TypedDict):
messages: Annotated[list[str], reducer(append)]
IMG_PLACEHOLDER_1
Now, when the node emits an update to the "messages"
key, it will be automatically appended to the existing list, rather than overwriting it.
If you want to apply more complex logic, you can define a custom reducer function and annotate the field with it. For example:
def custom_reducer(current_value: list[str], new_value: str) -> list[str]:
# Custom logic here
return current_value + [new_value.upper()]
@typeddict_state
class MyState(TypedDict):
messages: Annotated[list[str], reducer(custom_reducer)]
IMG_PLACEHOLDER_2
This way, any updates to the "messages"
key will be processed through the custom_reducer
function before being applied to the state.
Reducers are a powerful tool for managing state in a predictable and consistent manner, especially when dealing with multiple updates or complex state transformations.
from typing_extensions import Annotated
def add(left, right):
"""Can also import `add` from the `operator` built-in."""
return left + right
class State(TypedDict):
messages: Annotated[list[AnyMessage], add]
extra_field: int
现在我们的节点可以简化为:
def node(state: State):
new_message = AIMessage("Hello!")
return {"messages": [new_message], "extra_field": 10}
API Reference: START
from langgraph.graph import START
graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()
result = graph.invoke({"messages": [HumanMessage("Hi")]})
for message in result["messages"]:
message.pretty_print()
================================ Human Message =================================
Hi
================================== Ai Message ==================================
Hello!
MessagesState¶
在实际应用中,更新消息列表时还需要考虑以下几点:
LangGraph 包含一个内置的 reducer add_messages
,用于处理这些考虑事项:
API Reference: add_messages
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
extra_field: int
def node(state: State):
new_message = AIMessage("Hello!")
return {"messages": [new_message], "extra_field": 10}
graph = StateGraph(State).add_node(node).set_entry_point("node").compile()
input_message = {"role": "user", "content": "Hi"}
result = graph.invoke({"messages": [input_message]})
for message in result["messages"]:
message.pretty_print()
================================ Human Message =================================
Hi
================================== Ai Message ==================================
Hello!
这是涉及聊天模型的应用程序中状态的一种多功能表示方式。LangGraph 包含一个预构建的 MessagesState
,以便我们能够拥有:
定义输入和输出模式¶
默认情况下,StateGraph
使用单一的模式,所有节点都期望使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。
当指定了不同的模式时,节点之间通信仍会使用一个内部模式。输入模式确保提供的输入与预期的结构匹配,而输出模式会根据定义的输出模式过滤内部数据,仅返回相关的信息。
下面,我们将看到如何定义不同的输入和输出模式。
API Reference: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# Define the schema for the input
class InputState(TypedDict):
question: str
# Define the schema for the output
class OutputState(TypedDict):
answer: str
# Define the overall schema, combining both input and output
class OverallState(InputState, OutputState):
pass
# Define the node that processes the input and generates an answer
def answer_node(state: InputState):
# Example answer and an extra key
return {"answer": "bye", "question": state["question"]}
# Build the graph with input and output schemas specified
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node) # Add the answer node
builder.add_edge(START, "answer_node") # Define the starting edge
builder.add_edge("answer_node", END) # Define the ending edge
graph = builder.compile() # Compile the graph
# Invoke the graph with an input and print the result
print(graph.invoke({"question": "hi"}))
请注意,invoke 的输出仅包含输出模式。
在节点之间传递私有状态¶
在某些情况下,您可能希望节点之间交换对中间逻辑至关重要的信息,但这些信息不需要成为图的主模式的一部分。这种私有数据与图的整体输入/输出无关,且应仅在特定节点之间共享。
下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,在前两个步骤(node_1 和 node_2)之间传递私有数据,而第三个步骤(node_3)只能访问公共的整体状态。
API Reference: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# The overall state of the graph (this is the public state shared across nodes)
class OverallState(TypedDict):
a: str
# Output from node_1 contains private data that is not part of the overall state
class Node1Output(TypedDict):
private_data: str
# The private data is only shared between node_1 and node_2
def node_1(state: OverallState) -> Node1Output:
output = {"private_data": "set by node_1"}
print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")
return output
# Node 2 input only requests the private data available after node_1
class Node2Input(TypedDict):
private_data: str
def node_2(state: Node2Input) -> OverallState:
output = {"a": "set by node_2"}
print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")
return output
# Node 3 only has access to the overall state (no access to private data from node_1)
def node_3(state: OverallState) -> OverallState:
output = {"a": "set by node_3"}
print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")
return output
# Connect nodes in a sequence
# node_2 accepts private data from node_1, whereas
# node_3 does not see the private data.
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()
# Invoke the graph with the initial state
response = graph.invoke(
{
"a": "set at start",
}
)
print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:
Input: {'a': 'set at start'}.
Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:
Input: {'private_data': 'set by node_1'}.
Returned: {'a': 'set by node_2'}
Entered node `node_3`:
Input: {'a': 'set by node_2'}.
Returned: {'a': 'set by node_3'}
Output of graph invocation: {'a': 'set by node_3'}
使用 Pydantic 模型进行图状态管理¶
StateGraph 在初始化时接受一个 state_schema
参数,用于指定图中节点可以访问和更新的状态的“形状”。
在我们的示例中,通常使用原生 Python 的 TypedDict
作为 state_schema
,但 state_schema
可以是任何 类型。
在这里,我们将看到如何使用 Pydantic BaseModel 作为 state_schema
,以便对 输入 进行运行时验证。
已知限制
- 当前,图的输出将 不会 是一个 pydantic 模型的实例。
- 运行时验证仅发生在节点的输入上,而不是输出上。
- 来自 pydantic 的验证错误追踪不会显示错误发生在哪一个节点中。
API Reference: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel
# The overall state of the graph (this is the public state shared across nodes)
class OverallState(BaseModel):
a: str
def node(state: OverallState):
return {"a": "goodbye"}
# Build the state graph
builder = StateGraph(OverallState)
builder.add_node(node) # node_1 is the first node
builder.add_edge(START, "node") # Start the graph with node_1
builder.add_edge("node", END) # End the graph after node_1
graph = builder.compile()
# Test the graph with a valid input
graph.invoke({"a": "hello"})
用**无效**的输入调用图
try:
graph.invoke({"a": 123}) # Should be a string
except Exception as e:
print("An exception was raised because `a` is an integer rather than a string.")
print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
a
Input should be a valid string [type=string_type, input_value=123, input_type=int]
For further information visit https://errors.pydantic.dev/2.9/v/string_type
请参见下方 Pydantic 模型状态的附加功能:
序列化行为
在将 Pydantic 模型用作状态模式时,理解序列化的工作方式非常重要,特别是在以下情况:- 将 Pydantic 对象作为输入传递
- 从图中接收输出
- 处理嵌套的 Pydantic 模型
API Reference: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
class NestedModel(BaseModel):
value: str
class ComplexState(BaseModel):
text: str
count: int
nested: NestedModel
def process_node(state: ComplexState):
# Node receives a validated Pydantic object
print(f"Input state type: {type(state)}")
print(f"Nested type: {type(state.nested)}")
# Return a dictionary update
return {"text": state.text + " processed", "count": state.count + 1}
# Build the graph
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()
# Create a Pydantic instance for input
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"Input object type: {type(input_state)}")
# Invoke graph with a Pydantic instance
result = graph.invoke(input_state)
print(f"Output type: {type(result)}")
print(f"Output content: {result}")
# Convert back to Pydantic model if needed
output_model = ComplexState(**result)
print(f"Converted back to Pydantic: {type(output_model)}")
运行时类型转换
Pydantic 对某些数据类型执行运行时类型转换。这可能会有帮助,但如果你不了解这一点,也可能导致意外的行为。API Reference: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
class CoercionExample(BaseModel):
# Pydantic will coerce string numbers to integers
number: int
# Pydantic will parse string booleans to bool
flag: bool
def inspect_node(state: CoercionExample):
print(f"number: {state.number} (type: {type(state.number)})")
print(f"flag: {state.flag} (type: {type(state.flag)})")
return {}
builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()
# Demonstrate coercion with string inputs that will be converted
result = graph.invoke({"number": "42", "flag": "true"})
# This would fail with a validation error
try:
graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
print(f"\nExpected validation error: {e}")
使用消息模型
在状态模式中使用 LangChain 消息类型时,序列化有一些重要的注意事项。当你通过网络传输消息对象时,应使用AnyMessage
(而不是 BaseMessage
),以确保正确的序列化和反序列化。
API Reference: StateGraph | START | END | HumanMessage | AIMessage | AnyMessage
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage
from typing import List
class ChatState(BaseModel):
messages: List[AnyMessage]
context: str
def add_message(state: ChatState):
return {"messages": state.messages + [AIMessage(content="Hello there!")]}
builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()
# Create input with a message
initial_state = ChatState(
messages=[HumanMessage(content="Hi")], context="Customer support chat"
)
result = graph.invoke(initial_state)
print(f"Output: {result}")
# Convert back to Pydantic model to see message types
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
print(f"Message {i}: {type(msg).__name__} - {msg.content}")
添加运行时配置¶
有时你希望在调用图时能够对其进行配置。例如,你可能希望在运行时指定使用哪个 LLM 或系统提示,而无需将这些参数污染图状态。
要添加运行时配置:
- 为你的配置指定一个模式
- 将配置添加到节点或条件边的函数签名中
- 将配置传递给图。
以下是一个简单的示例:
API Reference: RunnableConfig | END | StateGraph | START
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict
# 1. Specify config schema
class ConfigSchema(TypedDict):
my_runtime_value: str
# 2. Define a graph that accesses the config in a node
class State(TypedDict):
my_state_value: str
def node(state: State, config: RunnableConfig):
if config["configurable"]["my_runtime_value"] == "a":
return {"my_state_value": 1}
elif config["configurable"]["my_runtime_value"] == "b":
return {"my_state_value": 2}
else:
raise ValueError("Unknown values.")
builder = StateGraph(State, config_schema=ConfigSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)
graph = builder.compile()
# 3. Pass in configuration at runtime:
print(graph.invoke({}, {"configurable": {"my_runtime_value": "a"}}))
print(graph.invoke({}, {"configurable": {"my_runtime_value": "b"}}))
扩展示例:在运行时指定LLM
下面我们将演示一个实际示例,展示如何在运行时配置使用哪个LLM。我们将使用OpenAI和Anthropic模型。import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig
from langgraph.graph import MessagesState
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict
class ConfigSchema(TypedDict):
model: str
MODELS = {
"anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
"openai": init_chat_model("openai:gpt-4.1-mini"),
}
def call_model(state: MessagesState, config: RunnableConfig):
model = config["configurable"].get("model", "anthropic")
model = MODELS[model]
response = model.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)
graph = builder.compile()
# Usage
input_message = {"role": "user", "content": "hi"}
# With no configuration, uses default (Anthropic)
response_1 = graph.invoke({"messages": [input_message]})["messages"][-1]
# Or, can set OpenAI
config = {"configurable": {"model": "openai"}}
response_2 = graph.invoke({"messages": [input_message]}, config=config)["messages"][-1]
print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
扩展示例:在运行时指定模型和系统消息
下面我们将演示一个实际示例,其中我们配置两个参数:在运行时使用的LLM和系统消息。import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
from typing import Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, MessagesState, StateGraph, START
from typing_extensions import TypedDict
class ConfigSchema(TypedDict):
model: Optional[str]
system_message: Optional[str]
MODELS = {
"anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
"openai": init_chat_model("openai:gpt-4.1-mini"),
}
def call_model(state: MessagesState, config: RunnableConfig):
model = config["configurable"].get("model", "anthropic")
model = MODELS[model]
messages = state["messages"]
if system_message := config["configurable"].get("system_message"):
messages = [SystemMessage(system_message)] + messages
response = model.invoke(messages)
return {"messages": [response]}
builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)
graph = builder.compile()
# Usage
input_message = {"role": "user", "content": "hi"}
config = {"configurable": {"model": "openai", "system_message": "Respond in Italian."}}
response = graph.invoke({"messages": [input_message]}, config)
for message in response["messages"]:
message.pretty_print()
添加重试策略¶
在许多情况下,你可能希望你的节点具有自定义的重试策略,例如当你调用一个 API、查询数据库或调用 LLM 等。LangGraph 允许你为节点添加重试策略。
要配置重试策略,请将 retry_policy
参数传递给 add_node。retry_policy
参数接受一个名为 RetryPolicy
的元组对象。下面,我们使用默认参数实例化一个 RetryPolicy
对象,并将其与一个节点关联:
from langgraph.pregel import RetryPolicy
builder.add_node(
"node_name",
node_function,
retry_policy=RetryPolicy(),
)
默认情况下,retry_on
参数使用 default_retry_on
函数,该函数会在发生任何异常时重试,除了以下这些:
ValueError
TypeError
ArithmeticError
ImportError
LookupError
NameError
SyntaxError
RuntimeError
ReferenceError
StopIteration
StopAsyncIteration
OSError
此外,对于来自流行 HTTP 请求库(如 requests
和 httpx
)的异常,它仅在 5xx 状态码时进行重试。
扩展示例:自定义重试策略
考虑一个从 SQL 数据库中读取数据的示例。下面我们在节点中传递了两种不同的重试策略:API Reference: init_chat_model | END | StateGraph | START | SQLDatabase | AIMessage
import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage
db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")
def query_database(state: MessagesState):
query_result = db.run("SELECT * FROM Artist LIMIT 10;")
return {"messages": [AIMessage(content=query_result)]}
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# Define a new graph
builder = StateGraph(MessagesState)
builder.add_node(
"query_database",
query_database,
retry_policy=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry_policy=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()
添加节点缓存¶
在需要避免重复操作的情况下(例如执行耗时或成本较高的操作),节点缓存非常有用。LangGraph 允许你为图中的节点添加个性化的缓存策略。
要配置缓存策略,请将 cache_policy
参数传递给 add_node 函数。在下面的示例中,使用一个 CachePolicy
对象,设置其生存时间为 120 秒,并使用默认的 key_func
生成器。然后将其与一个节点关联:
from langgraph.types import CachePolicy
builder.add_node(
"node_name",
node_function,
cache_policy=CachePolicy(ttl=120),
)
然后,要为图启用节点级别的缓存,请在编译图时设置 cache
参数。下面的示例使用 InMemoryCache
来设置一个具有内存缓存的图,但也可以使用 SqliteCache
。
创建一个步骤序列¶
前提条件
本指南假定您已经了解了上面关于state的部分。
在这里,我们演示如何构建一个简单的步骤序列。我们将展示:
- 如何构建一个顺序图
- 构建类似图的内置快捷方式。
要添加一系列节点,我们使用graph的.add_node
和.add_edge
方法:
API Reference: START | StateGraph
from langgraph.graph import START, StateGraph
builder = StateGraph(State)
# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)
# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
我们也可以使用内置的快捷方式.add_sequence
:
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")
为什么要在LangGraph中将应用程序步骤拆分为一个序列?
LangGraph使为您的应用程序添加底层持久化层变得容易。 这允许在节点执行之间检查点状态,因此您的LangGraph节点控制: 它们还决定了执行步骤如何流式传输,以及如何使用LangGraph Studio可视化和调试您的应用程序。让我们演示一个端到端的示例。我们将创建一个包含三个步骤的序列:
- 在状态的键中填充一个值
- 更新相同的值
- 填充一个不同的值
首先,我们定义我们的state。它决定了图的模式,还可以指定如何应用更新。有关更多详细信息,请参阅this section。
在我们的情况下,我们只需跟踪两个值:
我们的 nodes 只是读取图状态并对其进行更新的 Python 函数。该函数的第一个参数始终是状态:
def step_1(state: State):
return {"value_1": "a"}
def step_2(state: State):
current_value_1 = state["value_1"]
return {"value_1": f"{current_value_1} b"}
def step_3(state: State):
return {"value_2": 10}
Note
请注意,当对状态进行更新时,每个节点只需指定它希望更新的键的值即可。
默认情况下,这将**覆盖**相应键的值。您也可以使用reducers来控制更新的处理方式——例如,您可以将连续的更新追加到某个键上。有关更多细节,请参阅此部分。
最后,我们定义图。我们使用StateGraph来定义一个操作此状态的图。
然后,我们将使用add_node和add_edge来填充我们的图并定义其控制流。
API Reference: START | StateGraph
from langgraph.graph import START, StateGraph
builder = StateGraph(State)
# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)
# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
请注意:
.add_edge
接收节点的名称,对于函数,默认使用node.__name__
。- 我们必须指定图的入口点。为此,我们添加一条边,使用 START 节点。
- 当没有更多节点需要执行时,图将停止。
接下来我们编译我们的图。这会对图的结构进行一些基本检查(例如,识别孤立的节点)。如果我们通过 checkpointer 向应用程序添加持久化功能,它也会在此处传入。
LangGraph 提供了用于可视化您的图的内置工具。让我们检查一下我们的序列。有关可视化的详细信息,请参阅此指南。
让我们从一个简单的调用开始:
请注意:
- 我们通过为单个状态键提供一个值来启动调用。我们必须始终至少为一个键提供一个值。
- 我们传入的值被第一个节点覆盖了。
- 第二个节点更新了该值。
- 第三个节点填充了一个不同的值。
内置简写方式
langgraph>=0.2.46
包含了一个内置的简写方式 add_sequence
,用于添加节点序列。你可以按照如下方式编译相同的图:
创建分支¶
节点的并行执行对于加快整体图操作至关重要。LangGraph 提供了对节点并行执行的原生支持,这可以显著提升基于图的工作流程性能。这种并行化通过分发(fan-out)和合并(fan-in)机制实现,使用标准边和条件边。以下是一些示例,展示如何创建适合你的分支数据流。
并行运行图节点¶
在这个示例中,我们从 Node A
分发到 B 和 C
,然后合并到 D
。使用我们的状态,我们指定了 reducer 的加法操作。这将为状态中的特定键组合或累积值,而不是简单地覆盖现有值。对于列表来说,这意味着将新列表与现有列表连接起来。有关使用 reducer 更新状态的更多细节,请参阅上面关于 状态 reducer 的部分。
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
使用 reducer,你可以看到每个节点中添加的值会被累积。
Note
在上面的示例中,节点 "b"
和 "c"
在同一个 superstep 中并发执行。由于它们处于同一步骤中,节点 "d"
会在 "b"
和 "c"
都完成后执行。
重要的是,来自并行 superstep 的更新可能不会保持一致的顺序。如果你需要对来自并行 superstep 的更新进行一致且预定的排序,你应该将输出写入状态中的一个单独字段,并附带一个用于排序的值。
异常处理?
LangGraph 在 "supersteps" 内部执行节点,这意味着虽然并行分支是并行执行的,但整个 superstep 是 事务性的。如果其中任何一个分支抛出异常,所有 更新都不会被应用到状态(整个 superstep 出错)。
重要的是,当使用 checkpointer 时,superstep 中成功节点的结果会被保存,并且在恢复时不会重复。
如果你有容易出错的节点(例如希望处理不稳定的 API 调用),LangGraph 提供了两种方法来解决这个问题:- 你可以在节点中编写常规的 Python 代码来捕获和处理异常。
- 你可以设置一个 retry_policy,让图重试引发特定类型异常的节点。只有失败的分支会被重试,因此你无需担心重复工作。
延迟节点执行¶
当您希望延迟某个节点的执行,直到所有其他待处理任务完成时,延迟节点执行会很有用。这在分支长度不一致的工作流中尤其相关,例如 map-reduce 类型的工作流。
上面的例子展示了如何在每个路径只有一步的情况下进行扇出和扇入。但如果其中一个分支包含多个步骤呢?让我们在 "b"
分支中添加一个节点 "b_2"
:
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def b_2(state: State):
print(f'Adding "B_2" to {state["aggregate"]}')
return {"aggregate": ["B_2"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']
在上面的示例中,节点 "b"
和 "c"
在同一个超步中并发执行。我们在节点 d
上设置 defer=True
,这样它将在所有待处理任务完成之前不会执行。在这种情况下,这意味着 "d"
会一直等待,直到整个 "b"
分支完成。
条件分支¶
如果你的扇出需要根据运行时的状态变化,可以使用 add_conditional_edges 通过图状态选择一个或多个路径。下面是一个示例,其中节点 a
生成一个状态更新以确定下一个节点。
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
aggregate: Annotated[list, operator.add]
# Add a key to the state. We will set this key to determine
# how we branch.
which: str
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"], "which": "c"}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)
def conditional_edge(state: State) -> Literal["b", "c"]:
# Fill in arbitrary logic here that uses the state
# to determine the next node
return state["which"]
builder.add_conditional_edges("a", conditional_edge)
graph = builder.compile()
Tip
你的条件边可以路由到多个目标节点。例如:
Map-reduce 和 Send
API¶
默认情况下,Nodes
和 Edges
是在事先定义好的,并且操作的是同一个共享状态。然而,在某些情况下,确切的边可能无法提前知道,或者你可能希望在同一时间存在不同版本的状态。这种情形的一个常见例子是使用 map-reduce 设计模式。在这种设计模式中,第一个节点可能会生成一个对象列表,你可能想要将另一个节点应用到所有这些对象上。对象的数量可能在事先是未知的(这意味着边的数量也可能未知),并且下游 Node
的输入状态应该是不同的(每个生成的对象一个)。
为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send
接受两个参数:第一个是节点名称,第二个是要传递给该节点的状态。
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
graph.add_conditional_edges("node_a", continue_to_jokes)
下面我们实现了一个简单的示例,模拟使用 LLMs 来 (1) 生成一个主题列表(其长度在事先未知),(2) 并行生成笑话,以及 (3) 选择一个“最佳”笑话。重要的是,分发节点的输入状态与图的整体状态是不同的。
API Reference: Send | END | StateGraph | START
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
topic: str
subjects: list
# Notice here we use the operator.add
# This is because we want combine all the jokes we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
jokes: Annotated[list, operator.add]
best_selected_joke: str
# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
subject: str
# This is the function we will use to generate the subjects of the jokes.
# In general the length of the list generated by this node could vary each run.
def generate_topics(state: OverallState):
# Simulate a LLM.
return {"subjects": ["lions", "elephants", "penguins"]}
# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
# Simulate a LLM.
joke_map = {
"lions": "Why don't lions like fast food? Because they can't catch it!",
"elephants": "Why don't elephants use computers? They're afraid of the mouse!",
"penguins": (
"Why don’t penguins like talking to strangers at parties? "
"Because they find it hard to break the ice."
),
}
return {"jokes": [joke_map[state["subject"]]]}
# Here we define the logic to map out over the generated subjects
# We will use this as an edge in the graph
def continue_to_jokes(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
# Here we will judge the best joke
def best_joke(state: OverallState):
return {"best_selected_joke": "penguins"}
# Construct the graph: here we put everything together to construct our graph
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
graph = builder.compile()
# Call the graph: here we call it to generate a list of jokes
for step in graph.stream({"topic": "animals"}):
print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don’t penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}
创建和控制循环¶
当创建一个包含循环的图时,我们需要一种机制来终止执行。最常见的方法是添加一条条件边,一旦达到某个终止条件,该边将引导至END节点。
在调用或流式传输图时,你也可以设置图的递归限制。递归限制设置了图在引发错误前允许执行的超级步骤(supersteps)数量。了解更多关于递归限制的概念,请参阅此处。
让我们考虑一个带有循环的简单图,以更好地理解这些机制的工作方式。
Tip
如果你想返回状态的最后一个值而不是收到递归限制错误,请参阅下一节。
在创建循环时,你可以包含一条指定终止条件的条件边:
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
def route(state: State) -> Literal["b", END]:
if termination_condition(state):
return END
else:
return "b"
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
要控制递归限制,请在配置中指定"recursion_limit"
。这将引发一个GraphRecursionError
,你可以捕获并处理它:
from langgraph.errors import GraphRecursionError
try:
graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
print("Recursion Error")
让我们定义一个带有简单循环的图。请注意,我们使用了一个条件边来实现终止条件。
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Node A sees {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Node B sees {state["aggregate"]}')
return {"aggregate": ["B"]}
# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
# Define edges
def route(state: State) -> Literal["b", END]:
if len(state["aggregate"]) < 7:
return "b"
else:
return END
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
此架构类似于ReAct agent,其中节点 "a"
是一个调用工具的模型,节点 "b"
表示工具。
在我们的 route
条件边中,我们指定当状态中的 "aggregate"
列表长度超过阈值时应结束。
调用图时,我们会交替在节点 "a"
和 "b"
之间运行,直到满足终止条件后才终止。
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
设置递归限制¶
在某些应用中,我们可能无法保证一定会达到某个终止条件。在这种情况下,我们可以设置图的递归限制。这样,在经过一定数量的超级步骤后,会抛出一个GraphRecursionError
。我们之后可以捕获并处理这个异常:
from langgraph.errors import GraphRecursionError
try:
graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
print("Recursion Error")
请注意,这次我们在第四步之后终止。默认的递归限制是25。
扩展示例:在达到递归限制时返回状态
与引发GraphRecursionError
不同,我们可以向状态中引入一个新的键,用于跟踪到达递归限制前剩余的步骤数。然后,我们可以使用这个键来判断是否应该结束运行。
LangGraph实现了一个特殊的RemainingSteps
注解。在内部,它会创建一个ManagedValue
通道——一个状态通道,它将在我们的图运行期间存在,之后将不再存在。
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
remaining_steps: RemainingSteps
def a(state: State):
print(f'Node A sees {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Node B sees {state["aggregate"]}')
return {"aggregate": ["B"]}
# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
# Define edges
def route(state: State) -> Literal["b", END]:
if state["remaining_steps"] <= 2:
return END
else:
return "b"
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
# Test it out
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
扩展示例:带有分支的循环
为了更好地理解递归限制的工作方式,让我们考虑一个更复杂的例子。下面我们将实现一个循环,但其中一步会分叉成两个节点:API Reference: StateGraph | START | END
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Node A sees {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Node B sees {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Node C sees {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Node D sees {state["aggregate"]}')
return {"aggregate": ["D"]}
# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
# Define edges
def route(state: State) -> Literal["b", END]:
if len(state["aggregate"]) < 7:
return "b"
else:
return END
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']
异步¶
使用 async 编程范式可以在并发执行 IO-bound 代码时显著提高性能(例如,同时向聊天模型提供商发出多个 API 请求)。
要将 sync
的图实现转换为 async
实现,您需要:
- 将
nodes
使用async def
替代def
。 - 在代码中适当使用
await
。 - 根据需求使用
.ainvoke
或.astream
调用图。
由于许多 LangChain 对象实现了 Runnable Protocol,它提供了所有 sync
方法的 async
变体,因此通常可以快速将 sync
图升级为 async
图。
请看下面的例子。为了演示底层 LLM 的异步调用,我们将包含一个聊天模型:
{}¶
import os
from langchain.chat_models import init_chat_model
os.environ["AZURE_OPENAI_API_KEY"] = "..."
os.environ["AZURE_OPENAI_ENDPOINT"] = "..."
os.environ["OPENAI_API_VERSION"] = "2025-03-01-preview"
llm = init_chat_model(
"azure_openai:gpt-4.1",
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
)
API Reference: init_chat_model | StateGraph
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph
async def node(state: MessagesState): # (1)!
new_message = await llm.ainvoke(state["messages"]) # (2)!
return {"messages": [new_message]}
builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()
input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]}) # (3)!
- 声明节点为异步函数。
- 在节点内使用可用的异步调用。
- 在图对象本身上使用异步调用。
异步流
有关异步流的示例,请参阅 流指南。
使用 Command
组合控制流和状态更新¶
将控制流(边)和状态更新(节点)结合起来可能会很有用。例如,你可能希望在同一个节点中同时执行状态更新并决定下一步前往哪个节点。LangGraph 通过从节点函数返回 Command 对象来实现这一点:
def my_node(state: State) -> Command[Literal["my_other_node"]]:
return Command(
# 状态更新
update={"foo": "bar"},
# 控制流
goto="my_other_node"
)
下面我们将展示一个端到端的示例。让我们创建一个包含 3 个节点的简单图:A、B 和 C。我们首先会执行节点 A,然后根据节点 A 的输出决定下一步是前往节点 B 还是节点 C。
API Reference: StateGraph | START | Command
import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command
# Define graph state
class State(TypedDict):
foo: str
# Define the nodes
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
print("Called A")
value = random.choice(["a", "b"])
# this is a replacement for a conditional edge function
if value == "a":
goto = "node_b"
else:
goto = "node_c"
# note how Command allows you to BOTH update the graph state AND route to the next node
return Command(
# this is the state update
update={"foo": value},
# this is a replacement for an edge
goto=goto,
)
def node_b(state: State):
print("Called B")
return {"foo": state["foo"] + "b"}
def node_c(state: State):
print("Called C")
return {"foo": state["foo"] + "c"}
我们现在可以使用上述节点创建 StateGraph
了。请注意,该图中没有条件边用于路由!这是因为控制流是在 node_a
内部通过 Command
定义的。
builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# NOTE: there are no edges between nodes A, B and C!
graph = builder.compile()
Important
你可能已经注意到,我们使用了 Command
作为返回类型注解,例如 Command[Literal["node_b", "node_c"]]
。这对于图的渲染是必要的,并且告诉 LangGraph node_a
可以导航到 node_b
和 node_c
。
如果我们多次运行该图,我们会看到它根据节点 A 中的随机选择采取不同的路径(A -> B 或 A -> C)。
转到父图中的节点¶
如果你正在使用子图,你可能需要从子图中的一个节点导航到另一个子图(即父图中的不同节点)。为此,你可以在 Command
中指定 graph=Command.PARENT
:
def my_node(state: State) -> Command[Literal["my_other_node"]]:
return Command(
update={"foo": "bar"},
goto="other_subgraph", # 其中 `other_subgraph` 是父图中的一个节点
graph=Command.PARENT
)
让我们以上面的例子来演示这一点。我们将通过把上面例子中的 node_a
改为一个单节点图,并将其作为子图添加到我们的主图中。
使用 Command.PARENT
的状态更新
当你从子图节点向主图节点发送共享键的状态更新时(该键在父图和子图的state schemas中都存在),你**必须**在父图状态中为你要更新的键定义一个reducer。请参阅下面的例子。
import operator
from typing_extensions import Annotated
class State(TypedDict):
# NOTE: we define a reducer here
foo: Annotated[str, operator.add]
def node_a(state: State):
print("Called A")
value = random.choice(["a", "b"])
# this is a replacement for a conditional edge function
if value == "a":
goto = "node_b"
else:
goto = "node_c"
# note how Command allows you to BOTH update the graph state AND route to the next node
return Command(
update={"foo": value},
goto=goto,
# this tells LangGraph to navigate to node_b or node_c in the parent graph
# NOTE: this will navigate to the closest parent graph relative to the subgraph
graph=Command.PARENT,
)
subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()
def node_b(state: State):
print("Called B")
# NOTE: since we've defined a reducer, we don't need to manually append
# new characters to existing 'foo' value. instead, reducer will append these
# automatically (via operator.add)
return {"foo": "b"}
def node_c(state: State):
print("Called C")
return {"foo": "c"}
builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)
graph = builder.compile()
在工具内部使用¶
一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的账户号或ID查找客户信息。为了从工具中更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})
:
@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
"""使用此工具来查找用户信息,以便更好地回答他们的问题。"""
user_info = get_user_info(config.get("configurable", {}).get("user_id"))
return Command(
update={
# 更新状态键
"user_info": user_info,
# 更新消息历史
"messages": [ToolMessage("成功查找了用户信息", tool_call_id=tool_call_id)]
}
)
Important
当从工具返回 Command
时,您**必须**在 Command.update
中包含 messages
(或用于消息历史的任何状态键),并且 messages
列表中**必须**包含一个 ToolMessage
。这是确保最终消息历史有效的必要条件(LLM 提供商要求 AI 消息中的工具调用后必须跟随工具结果消息)。
如果您使用的工具通过 Command
更新状态,我们建议使用预构建的 ToolNode
,它会自动处理返回 Command
对象的工具,并将其传播到图状态中。如果您正在编写一个自定义节点以调用工具,则需要手动将工具返回的 Command
对象作为节点的更新进行传播。
可视化你的图¶
在这里我们演示如何可视化你创建的图。
你可以可视化任何任意的Graph,包括StateGraph。让我们通过绘制分形来玩点有趣的 :).
API Reference: StateGraph | START | END | add_messages
import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
class MyNode:
def __init__(self, name: str):
self.name = name
def __call__(self, state: State):
return {"messages": [("assistant", f"Called node {self.name}")]}
def route(state) -> Literal["entry_node", "__end__"]:
if len(state["messages"]) > 10:
return "__end__"
return "entry_node"
def add_fractal_nodes(builder, current_node, level, max_level):
if level > max_level:
return
# Number of nodes to create at this level
num_nodes = random.randint(1, 3) # Adjust randomness as needed
for i in range(num_nodes):
nm = ["A", "B", "C"][i]
node_name = f"node_{current_node}_{nm}"
builder.add_node(node_name, MyNode(node_name))
builder.add_edge(current_node, node_name)
# Recursively add more nodes
r = random.random()
if r > 0.2 and level + 1 < max_level:
add_fractal_nodes(builder, node_name, level + 1, max_level)
elif r > 0.05:
builder.add_conditional_edges(node_name, route, node_name)
else:
# End
builder.add_edge(node_name, "__end__")
def build_fractal_graph(max_level: int):
builder = StateGraph(State)
entry_point = "entry_node"
builder.add_node(entry_point, MyNode(entry_point))
builder.add_edge(START, entry_point)
add_fractal_nodes(builder, entry_point, 1, max_level)
# Optional: set a finish point if required
builder.add_edge(entry_point, END) # or any specific node
return builder.compile()
app = build_fractal_graph(3)
Mermaid¶
我们也可以将图表类转换为 Mermaid 语法。
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
__start__([<p>__start__</p>]):::first
entry_node(entry_node)
node_entry_node_A(node_entry_node_A)
node_entry_node_B(node_entry_node_B)
node_node_entry_node_B_A(node_node_entry_node_B_A)
node_node_entry_node_B_B(node_node_entry_node_B_B)
node_node_entry_node_B_C(node_node_entry_node_B_C)
__end__([<p>__end__</p>]):::last
__start__ --> entry_node;
entry_node --> __end__;
entry_node --> node_entry_node_A;
entry_node --> node_entry_node_B;
node_entry_node_B --> node_node_entry_node_B_A;
node_entry_node_B --> node_node_entry_node_B_B;
node_entry_node_B --> node_node_entry_node_B_C;
node_entry_node_A -.-> entry_node;
node_entry_node_A -.-> __end__;
node_node_entry_node_B_A -.-> entry_node;
node_node_entry_node_B_A -.-> __end__;
node_node_entry_node_B_B -.-> entry_node;
node_node_entry_node_B_B -.-> __end__;
node_node_entry_node_B_C -.-> entry_node;
node_node_entry_node_B_C -.-> __end__;
classDef default fill:#f2f0ff,line-height:1.2
classDef first fill-opacity:0
classDef last fill:#bfb6fc
PNG¶
如果需要,我们可以将图表渲染为 .png
格式。这里我们有三种选择:
- 使用 Mermaid.ink API(不需要额外的包)
- 使用 Mermaid + Pyppeteer(需要
pip install pyppeteer
) - 使用 graphviz(需要
pip install graphviz
)
使用 Mermaid.Ink
默认情况下,draw_mermaid_png()
使用 Mermaid.Ink 的 API 生成图表。
API Reference: CurveStyle | MermaidDrawMethod | NodeStyles
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles
display(Image(app.get_graph().draw_mermaid_png()))
使用 Mermaid + Pyppeteer
import nest_asyncio
nest_asyncio.apply() # Required for Jupyter Notebook to run async functions
display(
Image(
app.get_graph().draw_mermaid_png(
curve_style=CurveStyle.LINEAR,
node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
wrap_label_n_words=9,
output_file_path=None,
draw_method=MermaidDrawMethod.PYPPETEER,
background_color="white",
padding=10,
)
)
)
使用 Graphviz
try:
display(Image(app.get_graph().draw_png()))
except ImportError:
print(
"You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
)