工作流与智能体¶
本指南回顾了智能体系统的常见模式。在描述这些系统时,区分“工作流”和“智能体”会很有帮助。Anthropic 的《构建高效智能体》博客文章很好地解释了这种区别:
工作流是通过预定义代码路径编排大语言模型和工具的系统。 另一方面,智能体是大语言模型动态指导自身流程和工具使用,对如何完成任务保持控制的系统。
以下是一种直观展示这些差异的简单方式:
在构建智能体和工作流时,LangGraph 提供了许多优势,包括持久化、流式处理,以及对调试和部署的支持。
设置¶
你可以使用任何支持结构化输出和工具调用的聊天模型。下面,我们展示为 Anthropic 安装包、设置 API 密钥以及测试结构化输出/工具调用的过程。
初始化一个大语言模型(LLM)
import os
import getpass
from langchain_anthropic import ChatAnthropic
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-3-5-sonnet-latest")
构建模块:增强型大语言模型¶
大语言模型具备支持构建工作流和智能体的增强功能。这些功能包括结构化输出和工具调用,如Anthropic博客中《构建高效智能体》一文的这张图片所示:
# 结构化输出的模式
from pydantic import BaseModel, Field
class SearchQuery(BaseModel):
search_query: str = Field(None, description="经过优化以用于网络搜索的查询。")
justification: str = Field(
None, description="说明此查询与用户请求相关的原因。"
)
# 用结构化输出的模式增强大语言模型
structured_llm = llm.with_structured_output(SearchQuery)
# 调用增强后的大语言模型
output = structured_llm.invoke("钙CT评分与高胆固醇有什么关系?")
# 定义一个工具
def multiply(a: int, b: int) -> int:
return a * b
# 用工具增强大语言模型
llm_with_tools = llm.bind_tools([multiply])
# 调用大语言模型并输入触发工具调用的内容
msg = llm_with_tools.invoke("2乘以3等于多少?")
# 获取工具调用信息
msg.tool_calls
提示链¶
在提示链中,每次大语言模型(LLM)调用都会处理上一次调用的输出。
正如Anthropic博客文章《构建有效的智能体》中所提到的:
提示链将一个任务分解为一系列步骤,其中每次大语言模型调用都会处理上一次调用的输出。你可以在任何中间步骤添加编程检查(见下图中的“门控”),以确保流程仍在正轨上。
何时使用此工作流:当任务可以轻松、清晰地分解为固定的子任务时,此工作流是理想之选。主要目标是通过让每次大语言模型调用处理更简单的任务,用延迟换取更高的准确性。
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
# 图状态
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
# 节点
def generate_joke(state: State):
"""第一次大语言模型调用,生成初始笑话"""
msg = llm.invoke(f"Write a short joke about {state['topic']}")
return {"joke": msg.content}
def check_punchline(state: State):
"""门控函数,检查笑话是否有笑点"""
# 简单检查 - 笑话是否包含 "?" 或 "!"
if "?" in state["joke"] or "!" in state["joke"]:
return "Fail"
return "Pass"
def improve_joke(state: State):
"""第二次大语言模型调用,改进笑话"""
msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
return {"improved_joke": msg.content}
def polish_joke(state: State):
"""第三次大语言模型调用,进行最终润色"""
msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")
return {"final_joke": msg.content}
# 构建工作流
workflow = StateGraph(State)
# 添加节点
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
# 添加边以连接节点
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)
# 编译
chain = workflow.compile()
# 显示工作流
display(Image(chain.get_graph().draw_mermaid_png()))
# 调用
state = chain.invoke({"topic": "cats"})
print("Initial joke:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:
print("Improved joke:")
print(state["improved_joke"])
print("\n--- --- ---\n")
print("Final joke:")
print(state["final_joke"])
else:
print("Joke failed quality gate - no punchline detected!")
LangSmith追踪
https://smith.langchain.com/public/a0281fca-3a71-46de-beee-791468607b75/r
资源:
LangChain学院
请参阅我们关于提示链的课程此处。
from langgraph.func import entrypoint, task
# 任务
@task
def generate_joke(topic: str):
"""第一次大语言模型调用,生成初始笑话"""
msg = llm.invoke(f"Write a short joke about {topic}")
return msg.content
def check_punchline(joke: str):
"""门控函数,检查笑话是否有笑点"""
# 简单检查 - 笑话是否包含 "?" 或 "!"
if "?" in joke or "!" in joke:
return "Fail"
return "Pass"
@task
def improve_joke(joke: str):
"""第二次大语言模型调用,改进笑话"""
msg = llm.invoke(f"Make this joke funnier by adding wordplay: {joke}")
return msg.content
@task
def polish_joke(joke: str):
"""第三次大语言模型调用,进行最终润色"""
msg = llm.invoke(f"Add a surprising twist to this joke: {joke}")
return msg.content
@entrypoint()
def parallel_workflow(topic: str):
original_joke = generate_joke(topic).result()
if check_punchline(original_joke) == "Pass":
return original_joke
improved_joke = improve_joke(original_joke).result()
return polish_joke(improved_joke).result()
# 调用
for step in parallel_workflow.stream("cats", stream_mode="updates"):
print(step)
print("\n")
LangSmith追踪
https://smith.langchain.com/public/332fa4fc-b6ca-416e-baa3-161625e69163/r
并行处理¶
通过并行处理,大语言模型(LLMs)可以同时处理一项任务:
大语言模型有时可以同时处理一项任务,并通过编程方式聚合它们的输出。这种工作流程,即并行处理,主要有两种变体:分段处理:将一项任务分解为独立的子任务并并行运行。投票机制:多次运行相同的任务以获得多样化的输出。
何时使用此工作流程:当划分后的子任务可以并行处理以提高速度时,或者当需要多个视角或多次尝试以获得更可靠的结果时,并行处理非常有效。对于需要综合考虑多个因素的复杂任务,通常让每个大语言模型调用处理一个单独的因素,这样大语言模型在每个具体方面都能更专注,从而表现得更好。
# 图状态
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
# 节点
def call_llm_1(state: State):
"""第一次调用大语言模型生成初始笑话"""
msg = llm.invoke(f"Write a joke about {state['topic']}")
return {"joke": msg.content}
def call_llm_2(state: State):
"""第二次调用大语言模型生成故事"""
msg = llm.invoke(f"Write a story about {state['topic']}")
return {"story": msg.content}
def call_llm_3(state: State):
"""第三次调用大语言模型生成诗歌"""
msg = llm.invoke(f"Write a poem about {state['topic']}")
return {"poem": msg.content}
def aggregator(state: State):
"""将笑话、故事和诗歌合并为一个输出"""
combined = f"Here's a story, joke, and poem about {state['topic']}!\n\n"
combined += f"STORY:\n{state['story']}\n\n"
combined += f"JOKE:\n{state['joke']}\n\n"
combined += f"POEM:\n{state['poem']}"
return {"combined_output": combined}
# 构建工作流
parallel_builder = StateGraph(State)
# 添加节点
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
# 添加边以连接节点
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()
# 显示工作流
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))
# 调用
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])
LangSmith 跟踪
https://smith.langchain.com/public/3be2e53c-ca94-40dd-934f-82ff87fac277/r
资源:
文档
请参阅我们关于并行处理的文档 此处。
LangChain 学院
请参阅我们关于并行处理的课程 此处。
@task
def call_llm_1(topic: str):
"""第一次调用大语言模型生成初始笑话"""
msg = llm.invoke(f"Write a joke about {topic}")
return msg.content
@task
def call_llm_2(topic: str):
"""第二次调用大语言模型生成故事"""
msg = llm.invoke(f"Write a story about {topic}")
return msg.content
@task
def call_llm_3(topic):
"""第三次调用大语言模型生成诗歌"""
msg = llm.invoke(f"Write a poem about {topic}")
return msg.content
@task
def aggregator(topic, joke, story, poem):
"""将笑话、故事和诗歌合并为一个输出"""
combined = f"Here's a story, joke, and poem about {topic}!\n\n"
combined += f"STORY:\n{story}\n\n"
combined += f"JOKE:\n{joke}\n\n"
combined += f"POEM:\n{poem}"
return combined
# 构建工作流
@entrypoint()
def parallel_workflow(topic: str):
joke_fut = call_llm_1(topic)
story_fut = call_llm_2(topic)
poem_fut = call_llm_3(topic)
return aggregator(
topic, joke_fut.result(), story_fut.result(), poem_fut.result()
).result()
# 调用
for step in parallel_workflow.stream("cats", stream_mode="updates"):
print(step)
print("\n")
LangSmith 跟踪
https://smith.langchain.com/public/623d033f-e814-41e9-80b1-75e6abb67801/r
路由¶
路由对输入进行分类,并将其导向后续任务。正如Anthropic博客文章《构建高效智能体》中所述:
路由对输入进行分类,并将其导向专门的后续任务。这种工作流程允许关注点分离,并构建更具针对性的提示。如果没有这种工作流程,针对某一类输入进行优化可能会损害对其他输入的处理性能。
何时使用此工作流程:当面对复杂任务,且存在不同类别,这些类别分开处理效果更佳,并且可以通过大语言模型(LLM)或更传统的分类模型/算法进行准确分类时,路由工作流程会很有效。
from typing_extensions import Literal
from langchain_core.messages import HumanMessage, SystemMessage
# 用于路由逻辑的结构化输出模式
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="路由过程中的下一步"
)
# 为大语言模型添加结构化输出模式
router = llm.with_structured_output(Route)
# 状态
class State(TypedDict):
input: str
decision: str
output: str
# 节点
def llm_call_1(state: State):
"""编写一个故事"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_2(state: State):
"""编写一个笑话"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_3(state: State):
"""编写一首诗"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_router(state: State):
"""将输入路由到合适的节点"""
# 运行带有结构化输出的增强大语言模型,作为路由逻辑
decision = router.invoke(
[
SystemMessage(
content="根据用户的请求,将输入路由到故事、笑话或诗歌。"
),
HumanMessage(content=state["input"]),
]
)
return {"decision": decision.step}
# 条件边函数,用于路由到合适的节点
def route_decision(state: State):
# 返回你想要访问的下一个节点的名称
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"
# 构建工作流
router_builder = StateGraph(State)
# 添加节点
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)
# 添加边以连接节点
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{ # route_decision 返回的名称 : 要访问的下一个节点的名称
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)
# 编译工作流
router_workflow = router_builder.compile()
# 展示工作流
display(Image(router_workflow.get_graph().draw_mermaid_png()))
# 调用
state = router_workflow.invoke({"input": "给我写一个关于猫的笑话"})
print(state["output"])
LangSmith追踪
https://smith.langchain.com/public/c4580b74-fe91-47e4-96fe-7fac598d509c/r
资源:
LangChain学院
请参阅我们关于路由的课程此处。
示例
from typing_extensions import Literal
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, SystemMessage
# 用于路由逻辑的结构化输出模式
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="路由过程中的下一步"
)
# 为大语言模型添加结构化输出模式
router = llm.with_structured_output(Route)
@task
def llm_call_1(input_: str):
"""编写一个故事"""
result = llm.invoke(input_)
return result.content
@task
def llm_call_2(input_: str):
"""编写一个笑话"""
result = llm.invoke(input_)
return result.content
@task
def llm_call_3(input_: str):
"""编写一首诗"""
result = llm.invoke(input_)
return result.content
def llm_call_router(input_: str):
"""将输入路由到合适的节点"""
# 运行带有结构化输出的增强大语言模型,作为路由逻辑
decision = router.invoke(
[
SystemMessage(
content="根据用户的请求,将输入路由到故事、笑话或诗歌。"
),
HumanMessage(content=input_),
]
)
return decision.step
# 创建工作流
@entrypoint()
def router_workflow(input_: str):
next_step = llm_call_router(input_)
if next_step == "story":
llm_call = llm_call_1
elif next_step == "joke":
llm_call = llm_call_2
elif next_step == "poem":
llm_call = llm_call_3
return llm_call(input_).result()
# 调用
for step in router_workflow.stream("给我写一个关于猫的笑话", stream_mode="updates"):
print(step)
print("\n")
LangSmith追踪
https://smith.langchain.com/public/5e2eb979-82dd-402c-b1a0-a8cceaf2a28a/r
协调器 - 工作器¶
使用协调器 - 工作器模式时,协调器会将一个任务分解,并将每个子任务委派给工作器。正如 Anthropic 博客《构建高效智能体》中所提到的:
在协调器 - 工作器工作流中,一个中央大语言模型(LLM)会动态地分解任务,将它们委派给工作器大语言模型,并综合它们的结果。
何时使用此工作流:此工作流非常适合那些无法预先预测所需子任务的复杂任务(例如在编码中,需要更改的文件数量以及每个文件的更改性质可能取决于具体任务)。虽然它在拓扑结构上与并行化类似,但关键区别在于其灵活性——子任务不是预先定义的,而是由协调器根据具体输入来确定。
from typing import Annotated, List
import operator
# 用于规划的结构化输出模式
class Section(BaseModel):
name: str = Field(
description="报告此部分的名称。",
)
description: str = Field(
description="此部分要涵盖的主要主题和概念的简要概述。",
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="报告的各个部分。",
)
# 为大语言模型添加结构化输出模式
planner = llm.with_structured_output(Sections)
在 LangGraph 中创建工作器
由于协调器 - 工作器工作流很常见,LangGraph 提供了 Send
API 来支持这种模式。它允许你动态创建工作器节点,并向每个节点发送特定的输入。每个工作器都有自己的状态,所有工作器的输出都会写入一个 共享状态键,协调器图可以访问该键。这使得协调器能够访问所有工作器的输出,并将它们综合成最终输出。如下所示,我们遍历一个部分列表,并将每个部分 Send
到一个工作器节点。更多文档请参考此处和此处。
from langgraph.constants import Send
# 图状态
class State(TypedDict):
topic: str # 报告主题
sections: list[Section] # 报告部分列表
completed_sections: Annotated[
list, operator.add
] # 所有工作器并行写入此键
final_report: str # 最终报告
# 工作器状态
class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add]
# 节点
def orchestrator(state: State):
"""生成报告计划的协调器"""
# 生成查询
report_sections = planner.invoke(
[
SystemMessage(content="生成报告计划。"),
HumanMessage(content=f"这是报告主题: {state['topic']}"),
]
)
return {"sections": report_sections.sections}
def llm_call(state: WorkerState):
"""工作器撰写报告的一个部分"""
# 生成部分内容
section = llm.invoke(
[
SystemMessage(
content="根据提供的名称和描述撰写报告部分。每个部分不包含前言。使用 Markdown 格式。"
),
HumanMessage(
content=f"这是部分名称: {state['section'].name} 和描述: {state['section'].description}"
),
]
)
# 将更新后的部分写入已完成部分
return {"completed_sections": [section.content]}
def synthesizer(state: State):
"""从各个部分综合生成完整报告"""
# 已完成部分列表
completed_sections = state["completed_sections"]
# 将已完成部分格式化为字符串,用作最终部分的上下文
completed_report_sections = "\n\n---\n\n".join(completed_sections)
return {"final_report": completed_report_sections}
# 条件边函数,用于创建每个撰写报告一个部分的 llm_call 工作器
def assign_workers(state: State):
"""为计划中的每个部分分配一个工作器"""
# 通过 Send() API 并行启动部分撰写
return [Send("llm_call", {"section": s}) for s in state["sections"]]
# 构建工作流
orchestrator_worker_builder = StateGraph(State)
# 添加节点
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)
# 添加边以连接节点
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
"orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)
# 编译工作流
orchestrator_worker = orchestrator_worker_builder.compile()
# 显示工作流
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))
# 调用
state = orchestrator_worker.invoke({"topic": "撰写一份关于大语言模型缩放定律的报告"})
from IPython.display import Markdown
Markdown(state["final_report"])
LangSmith 跟踪
https://smith.langchain.com/public/78cbcfc3-38bf-471d-b62a-b299b144237d/r
资源:
LangChain 学院
请参考我们关于协调器 - 工作器的课程此处。
示例
from typing import List
# 用于规划的结构化输出模式
class Section(BaseModel):
name: str = Field(
description="报告此部分的名称。",
)
description: str = Field(
description="此部分要涵盖的主要主题和概念的简要概述。",
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="报告的各个部分。",
)
# 为大语言模型添加结构化输出模式
planner = llm.with_structured_output(Sections)
@task
def orchestrator(topic: str):
"""生成报告计划的协调器"""
# 生成查询
report_sections = planner.invoke(
[
SystemMessage(content="生成报告计划。"),
HumanMessage(content=f"这是报告主题: {topic}"),
]
)
return report_sections.sections
@task
def llm_call(section: Section):
"""工作器撰写报告的一个部分"""
# 生成部分内容
result = llm.invoke(
[
SystemMessage(content="撰写报告的一个部分。"),
HumanMessage(
content=f"这是部分名称: {section.name} 和描述: {section.description}"
),
]
)
# 将更新后的部分写入已完成部分
return result.content
@task
def synthesizer(completed_sections: list[str]):
"""从各个部分综合生成完整报告"""
final_report = "\n\n---\n\n".join(completed_sections)
return final_report
@entrypoint()
def orchestrator_worker(topic: str):
sections = orchestrator(topic).result()
section_futures = [llm_call(section) for section in sections]
final_report = synthesizer(
[section_fut.result() for section_fut in section_futures]
).result()
return final_report
# 调用
report = orchestrator_worker.invoke("撰写一份关于大语言模型缩放定律的报告")
from IPython.display import Markdown
Markdown(report)
LangSmith 跟踪
https://smith.langchain.com/public/75a636d0-6179-4a12-9836-e0aa571e87c5/r
评估器 - 优化器¶
在评估器 - 优化器工作流程中,一次大语言模型(LLM)调用生成一个响应,而另一次调用则循环提供评估和反馈:
在评估器 - 优化器工作流程中,一次大语言模型(LLM)调用生成一个响应,而另一次调用则循环提供评估和反馈。
何时使用此工作流程:当我们有明确的评估标准,并且迭代改进能带来可衡量的价值时,此工作流程特别有效。适用的两个迹象是,首先,当人类明确给出反馈时,大语言模型的响应能得到明显改善;其次,大语言模型能够提供此类反馈。这类似于人类作家在撰写一篇润色后的文档时可能经历的迭代写作过程。
# 图状态
class State(TypedDict):
joke: str
topic: str
feedback: str
funny_or_not: str
# 用于评估的结构化输出模式
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="判断这个笑话是否有趣。",
)
feedback: str = Field(
description="如果笑话不好笑,提供如何改进的反馈。",
)
# 为大语言模型增加结构化输出模式
evaluator = llm.with_structured_output(Feedback)
# 节点
def llm_call_generator(state: State):
"""大语言模型生成一个笑话"""
if state.get("feedback"):
msg = llm.invoke(
f"写一个关于 {state['topic']} 的笑话,但要考虑反馈:{state['feedback']}"
)
else:
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
return {"joke": msg.content}
def llm_call_evaluator(state: State):
"""大语言模型评估这个笑话"""
grade = evaluator.invoke(f"给这个笑话 {state['joke']} 打分")
return {"funny_or_not": grade.grade, "feedback": grade.feedback}
# 条件边函数,根据评估器的反馈决定是回到笑话生成器还是结束流程
def route_joke(state: State):
"""根据评估器的反馈决定是回到笑话生成器还是结束流程"""
if state["funny_or_not"] == "funny":
return "Accepted"
elif state["funny_or_not"] == "not funny":
return "Rejected + Feedback"
# 构建工作流程
optimizer_builder = StateGraph(State)
# 添加节点
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)
# 添加边来连接节点
optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
"llm_call_evaluator",
route_joke,
{ # route_joke 返回的名称 : 下一个要访问的节点名称
"Accepted": END,
"Rejected + Feedback": "llm_call_generator",
},
)
# 编译工作流程
optimizer_workflow = optimizer_builder.compile()
# 展示工作流程
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))
# 调用
state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])
LangSmith 跟踪
https://smith.langchain.com/public/86ab3e60-2000-4bff-b988-9b89a3269789/r
资源:
示例
# 用于评估的结构化输出模式
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="判断这个笑话是否有趣。",
)
feedback: str = Field(
description="如果笑话不好笑,提供如何改进的反馈。",
)
# 为大语言模型增加结构化输出模式
evaluator = llm.with_structured_output(Feedback)
# 节点
@task
def llm_call_generator(topic: str, feedback: Feedback):
"""大语言模型生成一个笑话"""
if feedback:
msg = llm.invoke(
f"写一个关于 {topic} 的笑话,但要考虑反馈:{feedback}"
)
else:
msg = llm.invoke(f"写一个关于 {topic} 的笑话")
return msg.content
@task
def llm_call_evaluator(joke: str):
"""大语言模型评估这个笑话"""
feedback = evaluator.invoke(f"给这个笑话 {joke} 打分")
return feedback
@entrypoint()
def optimizer_workflow(topic: str):
feedback = None
while True:
joke = llm_call_generator(topic, feedback).result()
feedback = llm_call_evaluator(joke).result()
if feedback.grade == "funny":
break
return joke
# 调用
for step in optimizer_workflow.stream("Cats", stream_mode="updates"):
print(step)
print("\n")
LangSmith 跟踪
https://smith.langchain.com/public/f66830be-4339-4a6b-8a93-389ce5ae27b4/r
代理¶
代理通常被实现为一个大语言模型(LLM),它基于环境反馈在循环中执行操作(通过工具调用)。正如Anthropic博客文章《构建有效的代理》中所提到的:
代理可以处理复杂的任务,但其实现通常很直接。它们通常只是基于环境反馈在循环中使用工具的大语言模型。因此,清晰且周全地设计工具集及其文档至关重要。
何时使用代理:代理适用于那些难以或无法预测所需步骤数量,且无法硬编码固定路径的开放性问题。大语言模型可能会进行多轮操作,你必须在一定程度上信任其决策能力。代理的自主性使其非常适合在可信任的环境中扩展任务。
from langchain_core.tools import tool
# 定义工具
@tool
def multiply(a: int, b: int) -> int:
"""将a和b相乘。
参数:
a: 第一个整数
b: 第二个整数
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""将a和b相加。
参数:
a: 第一个整数
b: 第二个整数
"""
return a + b
@tool
def divide(a: int, b: int) -> float:
"""将a除以b。
参数:
a: 第一个整数
b: 第二个整数
"""
return a / b
# 为大语言模型添加工具
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
API Reference: tool
from langgraph.graph import MessagesState
from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage
# 节点
def llm_call(state: MessagesState):
"""大语言模型决定是否调用工具"""
return {
"messages": [
llm_with_tools.invoke(
[
SystemMessage(
content="你是一个有用的助手,负责对一组输入进行算术运算。"
)
]
+ state["messages"]
)
]
}
def tool_node(state: dict):
"""执行工具调用"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
# 条件边函数,根据大语言模型是否进行了工具调用,决定路由到工具节点还是结束
def should_continue(state: MessagesState) -> Literal["environment", END]:
"""根据大语言模型是否进行了工具调用,决定是否继续循环或停止"""
messages = state["messages"]
last_message = messages[-1]
# 如果大语言模型进行了工具调用,则执行操作
if last_message.tool_calls:
return "Action"
# 否则,我们停止(回复用户)
return END
# 构建工作流
agent_builder = StateGraph(MessagesState)
# 添加节点
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("environment", tool_node)
# 添加边以连接节点
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
{
# should_continue返回的名称 : 下一个要访问的节点名称
"Action": "environment",
END: END,
},
)
agent_builder.add_edge("environment", "llm_call")
# 编译代理
agent = agent_builder.compile()
# 展示代理
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
# 调用
messages = [HumanMessage(content="将3和4相加。")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()
LangSmith跟踪
https://smith.langchain.com/public/051f0391-6761-4f8c-a53b-22231b016690/r
资源:
LangChain学院
请在此处查看我们关于代理的课程。
示例
此处有一个使用工具调用代理来创建/存储长期记忆的项目。
from langgraph.graph import add_messages
from langchain_core.messages import (
SystemMessage,
HumanMessage,
BaseMessage,
ToolCall,
)
@task
def call_llm(messages: list[BaseMessage]):
"""大语言模型决定是否调用工具"""
return llm_with_tools.invoke(
[
SystemMessage(
content="你是一个有用的助手,负责对一组输入进行算术运算。"
)
]
+ messages
)
@task
def call_tool(tool_call: ToolCall):
"""执行工具调用"""
tool = tools_by_name[tool_call["name"]]
return tool.invoke(tool_call)
@entrypoint()
def agent(messages: list[BaseMessage]):
llm_response = call_llm(messages).result()
while True:
if not llm_response.tool_calls:
break
# 执行工具
tool_result_futures = [
call_tool(tool_call) for tool_call in llm_response.tool_calls
]
tool_results = [fut.result() for fut in tool_result_futures]
messages = add_messages(messages, [llm_response, *tool_results])
llm_response = call_llm(messages).result()
messages = add_messages(messages, llm_response)
return messages
# 调用
messages = [HumanMessage(content="将3和4相加。")]
for chunk in agent.stream(messages, stream_mode="updates"):
print(chunk)
print("\n")
LangSmith跟踪
https://smith.langchain.com/public/42ae8bf9-3935-4504-a081-8ddbcbfc8b2e/r
预构建¶
LangGraph还提供了一个**预构建方法**来创建上述定义的代理(使用create_react_agent
函数):
https://langchain-ai.github.io/langgraph/how-tos/create-react-agent/
from langgraph.prebuilt import create_react_agent
# 传入:
# (1) 带有工具的增强大语言模型
# (2) 工具列表(用于创建工具节点)
pre_built_agent = create_react_agent(llm, tools=tools)
# 展示代理
display(Image(pre_built_agent.get_graph().draw_mermaid_png()))
# 调用
messages = [HumanMessage(content="将3和4相加。")]
messages = pre_built_agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()
LangSmith跟踪
https://smith.langchain.com/public/abab6a44-29f6-4b97-8164-af77413e494d/r
LangGraph 提供的功能¶
通过在 LangGraph 中构建上述各项内容,我们可以获得以下几方面的能力:
持久化:人工干预¶
LangGraph 持久化层支持对操作进行中断和审批(例如,人工干预)。请参阅 LangChain 学院的模块 3。
持久化:记忆¶
LangGraph 持久化层支持对话(短期)记忆和长期记忆。请参阅 LangChain 学院的模块 2 和模块 5:
流式处理¶
LangGraph 提供了多种方式来流式传输工作流/代理的输出或中间状态。请参阅 LangChain 学院的模块 3。
部署¶
LangGraph 为部署、可观测性和评估提供了便捷的途径。请参阅 LangChain 学院的模块 6。