Mastering LangGraph-Stream

Mastering LangGraph-Stream
LangGraph has built-in first-class stream support.
Streaming output graph
.stream and .astream are synchronous and asynchronous methods for streaming outputs back from the graph run. When calling these methods, several different modes can be specified (for example, `graph.stream(…, mode=”…”)`):
  • values: This will transmit the full value of the state after each step of the graph.
  • updates: This will transmit updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., running multiple nodes), these updates will be transmitted separately.

  • custom: This will stream custom data from within the graph nodes.

  • messages: This will transmit LLM tokens and metadata for graph nodes calling LLM.

  • debug: This will transmit as much information as possible throughout the execution of the graph.

Multiple streaming modes can also be specified simultaneously through a list. When doing this, the streaming output will be tuples (stream_mode, data). For example:
graph.stream(..., stream_mode=["updates", "messages"])
...('messages', (AIMessageChunk(content='Hi'), {'langgraph_step': 3, 'langgraph_node': 'agent', ...}))...('updates', {'agent': {'messages': [AIMessage(content="Hi, how can I help you?")]}})
The visualization below shows the difference between values and updates modes:
Mastering LangGraph-Stream
Streaming LLM tokens and events
Additionally, events occurring within nodes can be streamed using the astream_events method. This is very useful for streaming tokens from LLM calls.
This is a standard method on all LangChain objects. This means that certain events will be emitted along the way when executing the graph, and these events can be seen when running the graph with .astream_events.
All events have (among other things) event, name, and data fields.
  • event: This is the type of event being emitted. A detailed table of all callback events and triggers can be found here (https://python.langchain.com/docs/concepts/#callback-events).
  • name: the name of the event
  • data: data related to the event
What types of things would cause an event to occur?
  • Each node (runnable) emits on_chain_start when it begins execution, on_chain_stream during execution, and on_chain_end when it completes. Node events will include the node name in the event’s name field.
  • on_chain_start will be emitted at the start of graph execution, on_chain_stream after each node execution, and on_chain_end when the graph completes. Graph events will include LangGraph in the event’s name field.
  • Any write to the state channel (i.e., any time a value of a certain state key is updated) will emit on_chain_start and on_chain_end events.
Additionally, any events created within nodes (LLM events, tool events, manually emitted events, etc.) will also be visible in the output of .astream_events.
To make it more concrete and understand what it looks like, let’s see what events are returned when running a simple graph:
from langchain_ollama import ChatOllama
import base_conf
from langgraph.graph import StateGraph, MessagesState, START, END
import asyncio
from langchain_core.messages import HumanMessage
model = ChatOllama(base_url=base_conf.base_url,                   model=base_conf.model_name,                   temperature=base_conf.temperature)
def call_model(state: MessagesState):    response = model.invoke(state['messages'])    return {"messages": response}
workflow = StateGraph(MessagesState)
workflow.add_node(call_model)
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", END)
app = workflow.compile()
async def run(app):    async for event in app.astream_events({"messages": [HumanMessage("你好")]}, version="v2"):        kind = event["event"]        print(f"{kind}: {event['name']}")
asyncio.run(run(app))
on_chain_start: LangGraph
on_chain_start: __start__
on_chain_start: _write
on_chain_end: _write
on_chain_start: _write
on_chain_end: _write
on_chain_start: _write
on_chain_end: _write
on_chain_stream: __start__
on_chain_end: __start__
on_chain_start: call_model
on_chat_model_start: ChatOllama
on_chat_model_stream: ChatOllama
on_chat_model_stream: ChatOllama
on_chat_model_stream: ChatOllama
on_chat_model_stream: ChatOllama
on_chat_model_stream: ChatOllama
on_chat_model_end: ChatOllama
on_chain_start: _write
on_chain_end: _write
on_chain_stream: call_model
on_chain_end: call_model
on_chain_stream: LangGraph
on_chain_end: LangGraph
Each type of event contains differently formatted data. Let’s see what the on_chat_model_stream event looks like. This is an important event type because it is necessary for streaming tokens in the LLM response.

These events are as follows:

{'event': 'on_chat_model_stream', 'name': 'ChatOpenAI', 'run_id': '3fdbf494-acce-402e-9b50-4eab46403859', 'tags': ['seq:step:1'], 'metadata': {'langgraph_step': 1,  'langgraph_node': 'call_model',  'langgraph_triggers': ['start:call_model'],  'langgraph_task_idx': 0,  'checkpoint_id': '1ef657a0-0f9d-61b8-bffe-0c39e4f9ad6c',  'checkpoint_ns': 'call_model',  'ls_provider': 'openai',  'ls_model_name': 'gpt-4o-mini',  'ls_model_type': 'chat',  'ls_temperature': 0.7}, 'data': {'chunk': AIMessageChunk(content='Hello', id='run-3fdbf494-acce-402e-9b50-4eab46403859')}, 'parent_ids': []}
We can see that we have the event type and name (which we already knew).
We also have a lot of content in the metadata. Notably, “langgraph_node”:”call_model” is very useful information that tells us which node the model is being called from.
Finally, data is a very important field. It contains the actual data for this event! In this case, it is AIMessageChunk. It contains the content of the message as well as the ID. This is the ID for the entire AIMessage (not just this chunk), which is very useful – it can help us track which chunks are part of the same message (and thus display them together in the UI).
This information contains everything needed to create a UI for streaming LLM tokens.

Leave a Comment