Skip to content

如何为并行执行创建map-reduce分支

先决条件

本指南假设您熟悉以下内容:

Map-reduce 操作对于高效的任务分解和平行处理至关重要。这种做法涉及将任务分解为更小的子任务,在并行处理每个子任务后,再将所有完成的子任务的结果汇总起来。

考虑这样一个例子:给定用户提供的一个通用主题,生成一组相关的主题,为每个主题生成一个笑话,并从生成的列表中选择最佳的笑话。在这种设计模式中,第一个节点可能会生成一组对象(例如,相关主题),我们希望将另一个节点(例如,生成笑话)应用于所有这些对象(例如,主题)。然而,这里出现了两个主要的挑战。

(1) 对象的数量(例如,主题)在事先可能是未知的(这意味着边的数量可能是未知的),当我们在布局图时,(2) 输入状态到下游节点应该是不同的(每个生成的对象一个)。

LangGraph 通过其 Send API 解决了这些挑战。通过利用条件边,Send 可以将不同的状态(例如,主题)分发给多个节点实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,这允许灵活和动态的工作流管理。

Screenshot 2024-07-12 at 9.45.40 AM.png

设置

首先,让我们安装所需的包并设置API密钥

%%capture --no-stderr
%pip install -U langchain-anthropic langgraph
import os
import getpass


def _set_env(name: str):
    if not os.getenv(name):
        os.environ[name] = getpass.getpass(f"{name}: ")


_set_env("ANTHROPIC_API_KEY")

使用LangSmith进行LangGraph开发

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的内容,请参阅这里

定义图

使用 Pydantic 与 LangChain

本笔记本使用 Pydantic v2 BaseModel,这需要 langchain-core >= 0.3。使用 langchain-core < 0.3 将会导致错误,因为 Pydantic v1 和 v2 的 BaseModels 混合使用。

import operator
from typing import Annotated
from typing_extensions import TypedDict

from langchain_anthropic import ChatAnthropic

from langgraph.types import Send
from langgraph.graph import END, StateGraph, START

from pydantic import BaseModel, Field

# Model and prompts
# Define model and prompts we will use
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.

{jokes}"""


class Subjects(BaseModel):
    subjects: list[str]


class Joke(BaseModel):
    joke: str


class BestJoke(BaseModel):
    id: int = Field(description="Index of the best joke, starting with 0", ge=0)


model = ChatAnthropic(model="claude-3-5-sonnet-20240620")

# Graph components: define the components that will make up the graph


# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
    topic: str
    subjects: list
    # Notice here we use the operator.add
    # This is because we want combine all the jokes we generate
    # from individual nodes back into one list - this is essentially
    # the "reduce" part
    jokes: Annotated[list, operator.add]
    best_selected_joke: str


# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
    subject: str


# This is the function we will use to generate the subjects of the jokes
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}


# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
    prompt = joke_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Joke).invoke(prompt)
    return {"jokes": [response.joke]}


# Here we define the logic to map out over the generated subjects
# We will use this as an edge in the graph
def continue_to_jokes(state: OverallState):
    # We will return a list of `Send` objects
    # Each `Send` object consists of the name of a node in the graph
    # as well as the state to send to that node
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]


# Here we will judge the best joke
def best_joke(state: OverallState):
    jokes = "\n\n".join(state["jokes"])
    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
    response = model.with_structured_output(BestJoke).invoke(prompt)
    return {"best_selected_joke": state["jokes"][response.id]}


# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()

API Reference: ChatAnthropic | Send | END | StateGraph | START

from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

使用图表

# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
    print(s)
{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ["Why don't dolphins use smartphones? Because they're afraid of phishing!"]}}
{'generate_joke': {'jokes': ["Why don't you see penguins in Britain? Because they're afraid of Wales!"]}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'best_joke': {'best_selected_joke': "Why don't dolphins use smartphones? Because they're afraid of phishing!"}}

Comments