如何为并行执行创建map-reduce分支¶
Map-reduce 操作对于高效的任务分解和平行处理至关重要。这种做法涉及将任务分解为更小的子任务,在并行处理每个子任务后,再将所有完成的子任务的结果汇总起来。
考虑这样一个例子:给定用户提供的一个通用主题,生成一组相关的主题,为每个主题生成一个笑话,并从生成的列表中选择最佳的笑话。在这种设计模式中,第一个节点可能会生成一组对象(例如,相关主题),我们希望将另一个节点(例如,生成笑话)应用于所有这些对象(例如,主题)。然而,这里出现了两个主要的挑战。
(1) 对象的数量(例如,主题)在事先可能是未知的(这意味着边的数量可能是未知的),当我们在布局图时,(2) 输入状态到下游节点应该是不同的(每个生成的对象一个)。
LangGraph 通过其 Send
API 解决了这些挑战。通过利用条件边,Send
可以将不同的状态(例如,主题)分发给多个节点实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,这允许灵活和动态的工作流管理。
设置¶
首先,让我们安装所需的包并设置API密钥
import os
import getpass
def _set_env(name: str):
if not os.getenv(name):
os.environ[name] = getpass.getpass(f"{name}: ")
_set_env("ANTHROPIC_API_KEY")
使用LangSmith进行LangGraph开发
注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的内容,请参阅这里。
定义图¶
使用 Pydantic 与 LangChain
本笔记本使用 Pydantic v2 BaseModel
,这需要 langchain-core >= 0.3
。使用 langchain-core < 0.3
将会导致错误,因为 Pydantic v1 和 v2 的 BaseModels
混合使用。
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langchain_anthropic import ChatAnthropic
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
from pydantic import BaseModel, Field
# Model and prompts
# Define model and prompts we will use
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.
{jokes}"""
class Subjects(BaseModel):
subjects: list[str]
class Joke(BaseModel):
joke: str
class BestJoke(BaseModel):
id: int = Field(description="Index of the best joke, starting with 0", ge=0)
model = ChatAnthropic(model="claude-3-5-sonnet-20240620")
# Graph components: define the components that will make up the graph
# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
topic: str
subjects: list
# Notice here we use the operator.add
# This is because we want combine all the jokes we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
jokes: Annotated[list, operator.add]
best_selected_joke: str
# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
subject: str
# This is the function we will use to generate the subjects of the jokes
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}
# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
# Here we define the logic to map out over the generated subjects
# We will use this as an edge in the graph
def continue_to_jokes(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
# Here we will judge the best joke
def best_joke(state: OverallState):
jokes = "\n\n".join(state["jokes"])
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
return {"best_selected_joke": state["jokes"][response.id]}
# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()
API Reference: ChatAnthropic | Send | END | StateGraph | START
使用图表¶
# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
print(s)
{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ["Why don't dolphins use smartphones? Because they're afraid of phishing!"]}}
{'generate_joke': {'jokes': ["Why don't you see penguins in Britain? Because they're afraid of Wales!"]}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'best_joke': {'best_selected_joke': "Why don't dolphins use smartphones? Because they're afraid of phishing!"}}