Practical AI Agent: Implementing Persistence and Streaming with LangGraph

Click πŸ‘‡πŸ» to follow, article from

πŸ™‹β™‚οΈ Friends who want to join the community can check the method at the end of the article to communicate in the group.

β€œ When building an AI Agent system, persistence and streaming are two key concepts. Persistence allows us to save the Agent’s state at any time so that we can recover from that state in future interactions, which is crucial for long-running applications. Streaming, on the other hand, can provide real-time signals about the Agent’s current operations, offering transparency and control over its behavior. This article will implement these powerful capabilities for AI Agents step by step using LangGraph.”

Practical AI Agent: Implementing Persistence and Streaming with LangGraph

Hello everyone, I am Si Ling Qi, and today I want to talk about how to add persistence and streaming capabilities to AI Agents using LangGraph. This is a key step in making AI Agents smarter and more practical!

1. Why Do We Need Persistence and Streaming

When building an AI Agent, persistence and streaming are two very important concepts. Imagine if your Agent suddenly interrupts while performing a long task, all previous states would be lost, which would be incredibly frustrating! Persistence can solve this problem; it allows the Agent’s state to be saved so you can continue from where it left off. Streaming, on the other hand, acts like a “real-time monitor” for the Agent, allowing you to see what it is doing at any time, which is crucial for applications that require real-time feedback!

2. Setting Up the Agent

First, we need to recreate an Agent. This step mainly involves loading necessary environment variables, installing and importing required libraries, setting up the Tavily search tool, defining the Agent’s state, and finally building the Agent.

1. Install Required Libraries

Before we start, we need to install some necessary libraries. Open your terminal and run the following command:

pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1

2. Set Environment Variables

We need to set some environment variables so that the Agent can access the Tavily and Groq APIs. Add the following code:

import os
os.environ['TAVILY_API_KEY'] = "<tavily_api_key>"  # Replace with your Tavily API key
os.environ['GROQ_API_KEY'] = "<groq_api_key>"      # Replace with your Groq API key</groq_api_key></tavily_api_key>

3. Import Necessary Modules

Next, we need to import some necessary modules:

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults

4. Set Up the Tavily Search Tool

Tavily is a powerful search tool that we can use to provide search capabilities for the Agent. Set it up as follows:

tool = TavilySearchResults(max_results=2)

5. Define the Agent State

We need to define an Agent state to store the Agent’s message history:

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

6. Build the Agent Class

Now, let’s build the Agent class. This class will define the Agent’s behavior and state management:

class Agent:
    def __init__(self, model, tools, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile()
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

3. Adding Persistence Functionality

To achieve persistence, we use LangGraph’s checkpointer feature. Here we are using SqliteSaver, which is based on SQLite database, simple and practical.

1. Set Up SqliteSaver

from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(sqlite_conn)

2. Modify the Agent Class to Support Persistence

We need to modify the Agent class to accept a checkpointer parameter:

class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        # Other code remains unchanged
        self.graph = graph.compile(checkpointer=checkpointer)

3. Create an Agent with Persistence Functionality

Now we can create an Agent with persistence functionality:

prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow-up question, you are allowed to do that!"""
model = ChatGroq(model="Llama-3.3-70b-Specdec")
bot = Agent(model, [tool], system=prompt, checkpointer=memory)

4. Adding Streaming Functionality

Streaming is divided into two types: streaming messages and streaming tokens. Let’s first look at streaming messages.

1. Streaming Messages

Streaming messages allow us to see the Agent’s operations in real-time. We can achieve this with the following code:

messages = [HumanMessage(content="What is the weather in Texas?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

After running, you will see the Agent output the process of searching for the weather in real-time, finally giving an answer. Isn’t it cool?

2. Understanding Thread IDs

thread_id is an important concept in streaming. It allows the Agent to maintain multiple independent conversation threads. For example, we can continue to ask about the weather using the same thread_id:

messages = [HumanMessage(content="What about in LA?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

The Agent will directly respond to the weather in LA based on the previous conversation. However, if you change the thread_id, the Agent will ask you to provide more context because it won’t know what you are talking about.

3. Streaming Tokens

Streaming tokens allow you to see the Agent’s thought process in real-time. We use the astream_events method to achieve this:

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    messages = [HumanMessage(content="What is the weather in SF?")]
    thread = {"configurable": {"thread_id": "4"}}
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|")

In this way, you can see the Agent outputting answers one word at a time, as if chatting with you.

By adding persistence and streaming, our AI Agent becomes more powerful. Persistence allows the Agent to remember previous conversation content, while streaming lets us understand the Agent’s operations in real-time. These features are almost essential for building production-level applications.

References

  • β€’ AI agents in langgraphhttps://learn.deeplearning.ai/courses/ai-agents-in-langgraph

Welcome to like, to watch, and follow. Follow the public account to not miss out on the wonderful content⭐️

I am Si Ling Qi🐝, an internet practitioner passionate about AI. Here, I share my observations, thoughts, and insights. I hope to inspire those who love AI, technology, and life through my self-exploration process.

Looking forward to our unexpected encounters. Click πŸ‘‡πŸ» to follow

πŸ™‹β™‚οΈ Join the group for communication 1. Click on the “Community” in the public account menu to scan the QR code to join the group. 2. Reply “join group” or “add group” to add the author’s WeChat to join the group.

Leave a Comment