Skip to content

如何从您的图中流式传输LLM令牌

前提条件

本指南假设您熟悉以下内容:

在使用LangGraph构建LLM应用程序时,您可能希望从LangGraph节点中的LLM调用中流式传输单个LLM令牌。您可以使用graph.stream(..., stream_mode="messages")来实现:

from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
def call_model(state: State):
    model.invoke(...)
    ...

graph = (
    StateGraph(State)
    .add_node(call_model)
    ...
    .compile()

for msg, metadata in graph.stream(inputs, stream_mode="messages"):
    print(msg)

流式传输的输出将是包含(消息片段, 元数据)的元组:

  • 消息片段是LLM流式传输的令牌
  • 元数据是一个字典,包含有关调用LLM的图节点的信息以及LLM调用元数据

不使用LangChain

如果您需要**不使用LangChain**流式传输LLM令牌,可以使用stream_mode="custom"直接从LLM提供商客户端流式传输输出。查看下面的示例以了解更多信息。

Python < 3.11中的异步

当使用Python < 3.11与异步代码时,请确保在调用模型时手动传递RunnableConfig,如下所示:model.ainvoke(..., config)。 流式传输方法通过作为回调传递的流式传输跟踪器收集您嵌套代码中的所有事件。在3.11及以上版本中,这是通过contextvars自动处理的;在3.11之前,asyncio的任务缺乏适当的contextvar支持,这意味着回调将仅在您手动传递配置时传播。我们在下面的call_model函数中这样做。

环境搭建

首先,我们需要安装所需的包

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai

接下来,我们需要为OpenAI(我们将使用的大型语言模型)设置API密钥。

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

使用LangSmith进行LangGraph开发

注册LangSmith,可以快速发现并解决您的LangGraph项目中的问题,提高项目性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

Note

注意,在下面的 call_model(state: State, config: RunnableConfig): 中,我们 a) 接受了节点函数中的 RunnableConfig,b) 并将其作为 model.ainvoke(..., config) 的第二个参数传递。这在 Python >= 3.11 中是可选的。

示例

下面我们演示了一个在一个节点中包含两个大语言模型(LLM)调用的例子。

from typing import TypedDict
from langgraph.graph import START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI


# Note: we're adding the tags here to be able to filter the model outputs down the line
joke_model = ChatOpenAI(model="gpt-4o-mini", tags=["joke"])
poem_model = ChatOpenAI(model="gpt-4o-mini", tags=["poem"])


class State(TypedDict):
    topic: str
    joke: str
    poem: str


async def call_model(state, config):
    topic = state["topic"]
    print("Writing joke...")
    # Note: Passing the config through explicitly is required for python < 3.11
    # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
    joke_response = await joke_model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    print("\n\nWriting poem...")
    poem_response = await poem_model.ainvoke(
        [{"role": "user", "content": f"Write a short poem about {topic}"}],
        config,
    )
    return {"joke": joke_response.content, "poem": poem_response.content}


graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

API Reference: START | StateGraph | ChatOpenAI

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
):
    if msg.content:
        print(msg.content, end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|

Writing poem...
In| sun|lit| patches|,| sleek| and| sly|,|  
|Wh|isk|ers| twitch| as| shadows| fly|.|  
|With| velvet| paws| and| eyes| so| bright|,|  
|They| dance| through| dreams|,| both| day| and| night|.|  

|A| playful| p|ounce|,| a| gentle| p|urr|,|  
|In| every| leap|,| a| soft| allure|.|  
|Cur|led| in| warmth|,| a| silent| grace|,|  
|Each| furry| friend|,| a| warm| embrace|.|  

|Myst|ery| wrapped| in| fur| and| charm|,|  
|A| soothing| presence|,| a| gentle| balm|.|  
|In| their| gaze|,| the| world| slows| down|,|  
|For| in| their| realm|,| we're| all| ren|own|.|

metadata
{'langgraph_step': 1,
 'langgraph_node': 'call_model',
 'langgraph_triggers': ['start:call_model'],
 'langgraph_path': ('__pregel_pull', 'call_model'),
 'langgraph_checkpoint_ns': 'call_model:6ddc5f0f-1dd0-325d-3014-f949286ce595',
 'checkpoint_ns': 'call_model:6ddc5f0f-1dd0-325d-3014-f949286ce595',
 'ls_provider': 'openai',
 'ls_model_name': 'gpt-4o-mini',
 'ls_model_type': 'chat',
 'ls_temperature': 0.7,
 'tags': ['poem']}

过滤到特定的大语言模型调用

你可以看到我们正在从所有LLM调用中流式传输令牌。现在让我们过滤这些流式传输的令牌,只包含特定的LLM调用。我们可以使用流式传输的元数据,并使用我们之前添加到LLMs的标签来过滤事件:

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
):
    if msg.content and "joke" in metadata.get("tags", []):
        print(msg.content, end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|

Writing poem...

无LangChain示例

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )

    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


async def call_model(state, config, writer):
    topic = state["topic"]
    joke = ""
    poem = ""

    print("Writing joke...")
    async for msg_chunk in stream_tokens(
        model_name, [{"role": "user", "content": f"Write a joke about {topic}"}]
    ):
        joke += msg_chunk["content"]
        metadata = {**config["metadata"], "tags": ["joke"]}
        chunk_to_stream = (msg_chunk, metadata)
        writer(chunk_to_stream)

    print("\n\nWriting poem...")
    async for msg_chunk in stream_tokens(
        model_name, [{"role": "user", "content": f"Write a short poem about {topic}"}]
    ):
        poem += msg_chunk["content"]
        metadata = {**config["metadata"], "tags": ["poem"]}
        chunk_to_stream = (msg_chunk, metadata)
        writer(chunk_to_stream)

    return {"joke": joke, "poem": poem}


graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

stream_mode=\"custom\"

在没有使用LangChain的情况下流式传输LLM令牌时,我们建议使用stream_mode="custom"。这允许您明确控制包含在LangGraph流式输出中的LLM提供商API数据,包括任何附加元数据。

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="custom",
):
    print(msg["content"], end="|", flush=True)
Writing joke...
Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the|

Writing poem...
 mouse|!|In| sun|lit| patches|,| they| stretch| and| y|awn|,|  
|With| whispered| paws| at| the| break| of| dawn|.|  
|Wh|isk|ers| twitch| in| the| morning| light|,|  
|Sil|ken| shadows|,| a| graceful| sight|.|  

|The| gentle| p|urr|s|,| a| soothing| song|,|  
|In| a| world| of| comfort|,| where| they| belong|.|  
|M|yster|ious| hearts| wrapped| in| soft|est| fur|,|  
|F|eline| whispers| in| every| p|urr|.|  

|Ch|asing| dreams| on| a| moon|lit| chase|,|  
|With| a| flick| of| a| tail|,| they| glide| with| grace|.|  
|Oh|,| playful| spirits| of| whisk|ered| cheer|,|  
|In| your| quiet| company|,| the| world| feels| near|.|  |

metadata
{'langgraph_step': 1,
 'langgraph_node': 'call_model',
 'langgraph_triggers': ['start:call_model'],
 'langgraph_path': ('__pregel_pull', 'call_model'),
 'langgraph_checkpoint_ns': 'call_model:3fa3fbe1-39d8-5209-dd77-0da38d4cc1c9',
 'tags': ['poem']}

要过滤到特定的大语言模型(LLM)调用,您可以使用流式元数据:

async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="custom",
):
    if "poem" in metadata.get("tags", []):
        print(msg["content"], end="|", flush=True)
Writing joke...


Writing poem...
In| shadows| soft|,| they| weave| and| play|,|  
|With| whispered| paws|,| they| greet| the| day|.|  
|Eyes| like| lantern|s|,| bright| and| keen|,|  
|Guard|ians| of| secrets|,| unseen|,| serene|.|  

|They| twist| and| stretch| in| sun|lit| beams|,|  
|Ch|asing| the| echoes| of| half|-|formed| dreams|.|  
|With| p|urring| songs| that| soothe| the| night|,|  
|F|eline| spirits|,| pure| delight|.|  

|On| windows|ills|,| they| perch| and| stare|,|  
|Ad|vent|urers| bold| with| a| graceful| flair|.|  
|In| every| leap| and| playful| bound|,|  
|The| magic| of| cats|—|where| love| is| found|.|

Comments