In this blog, we will delve into how to build an intelligent SQL query assistant using LangChain and LangGraph. This assistant can convert natural language questions into SQL queries, execute the queries, and provide human-friendly answers. More importantly, it supports human-machine collaborative workflows, allowing users to intervene at critical steps.
0. Initialize SQL Data
We create a new file named <span>create_and_insert.sql</span>
CREATE TABLE users (
name TEXT NOT NULL,
age INTEGER NOT NULL,
hobbies TEXT,
profession TEXT
);
INSERT INTO users (name, age, hobbies, profession) VALUES
('Zhang San', 25, 'Basketball, Swimming', 'Software Engineer'),
('Li Si', 30, 'Reading, Traveling', 'Teacher'),
('Wang Wu', 22, 'Music, Gaming', 'Student'),
('Zhao Liu', 28, 'Painting, Photography', 'Designer'),
('Sun Qi', 35, 'Football, Running', 'Doctor'),
('Zhou Ba', 24, 'Dancing, Yoga', 'Fitness Coach'),
('Wu Jiu', 29, 'Movies, Food', 'Chef'),
('Zheng Shi', 27, 'Writing, Calligraphy', 'Writer'),
('Liu Yi', 32, 'Hiking, Fishing', 'Sales Manager'),
('Chen Er', 26, 'Singing, Watching Plays', 'Administrative Assistant');
Then execute it after installing sqlite3 locally:
sqlite3 test.db
.read create_and_insert.sql
1. Basic Setup and Dependency Import
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from typing_extensions import TypedDict, Annotated
# Initialize database connection
db = SQLDatabase.from_uri("sqlite:///test.db")
This code imports necessary dependencies and establishes a database connection. SQLDatabase is a utility provided by LangChain that seamlessly integrates database operations into the LLM workflow.
2. Define Application State
class State(TypedDict):
question: str # User's natural language question
query: str # Generated SQL query
result: str # SQL query result
answer: str # Final natural language answer
Using TypedDict to define the application state not only provides type hints but also clarifies the flow of data within the workflow.
3. Initialize Language Model
llm = ChatOpenAI(model="gpt-4-mini")
query_prompt_template = hub.pull("langchain-ai/sql-query-system-prompt")
We use OpenAI’s GPT-4-mini model and retrieve a predefined SQL query prompt template from LangChain Hub. This template is optimized to generate high-quality SQL queries.
4. Core Functionality Implementation
4.1 SQL Query Generation
class QueryOutput(TypedDict):
"""Generated SQL query."""
query: Annotated[str, ..., "Syntactically valid SQL query."]
def write_query(state: State):
"""Convert natural language question to SQL query"""
prompt = query_prompt_template.invoke({
"dialect": db.dialect,
"top_k": 10,
"table_info": db.get_table_info(),
"input": state["question"],
})
structured_llm = llm.with_structured_output(QueryOutput)
result = structured_llm.invoke(prompt)
return {"query": result["query"]}
This function is responsible for converting the user’s natural language question into a valid SQL query. It uses:
-
• Structured output ensures the generated SQL is syntactically correct -
• Database schema information to generate accurate queries -
• Predefined prompt templates to optimize query generation
4.2 Query Execution
def execute_query(state: State):
"""Execute SQL query"""
execute_query_tool = QuerySQLDataBaseTool(db=db)
return {"result": execute_query_tool.invoke(state["query"])}
This function safely executes the generated SQL query, using LangChain’s QuerySQLDataBaseTool to ensure the safety and reliability of query execution.
4.3 Answer Generation
def generate_answer(state: State):
"""Generate natural language answer using query result"""
prompt = (
"Given the following user question, corresponding SQL query, "
"and SQL result, answer the user question.\n\n"
f'Question: {state["question"]}\n'
f'SQL Query: {state["query"]}\n'
f'SQL Result: {state["result"]}'
)
response = llm.invoke(prompt)
return {"answer": response.content}
This function converts technical SQL results into user-friendly natural language answers.
5. Workflow Orchestration and Human Intervention Mechanism
5.1 Basic Workflow Setup
memory = MemorySaver()
graph_builder = StateGraph(State).add_sequence([
write_query,
execute_query,
generate_answer
])
graph_builder.add_edge(START, "write_query")
5.2 Human Intervention Point Configuration
graph = graph_builder.compile(
checkpointer=memory,
interrupt_before=["execute_query"] # Key configuration: interrupt before executing query
)
The implementation of the human intervention mechanism relies on several key points:
-
1. Interrupt Point Setup:
-
• Use <span>interrupt_before=["execute_query"]</span><span> parameter to specify interruption before executing the query</span>
-
• This ensures that the user has the opportunity to review and confirm before executing operations that may affect the database
-
• <span>MemorySaver</span><span> is used to save the state of the workflow</span>
-
• This allows the workflow to continue executing after interruption without losing previous processing results
config = {"configurable": {"thread_id": "1"}}
# First phase: execute to the interrupt point
for step in graph.stream(
{"question": "How many users are there?"},
config,
stream_mode="updates",
):
print(step) # Display the execution result of each step
-
4. User Interaction:
try:
user_approval = input("Do you want to go to execute query? (yes/no): ")
except Exception:
user_approval = "no" # Default to deny execution in case of exception
-
5. Conditional Continuation:
if user_approval.lower() == "yes":
# Continue executing the remaining workflow
for step in graph.stream(None, config, stream_mode="updates"):
print(step)
else:
print("Operation cancelled by user.")
5.3 Workflow Execution Process
The complete execution process is as follows:
-
1. First execute <span>write_query</span><span> step to generate SQL query</span>
-
2. Automatically interrupt before reaching <span>execute_query</span><span> step</span>
-
3. Display the generated query and wait for user confirmation -
4. Based on the user’s choice:
-
• If confirmed, continue executing the query and generating the answer -
• If denied, terminate the operation
5.4 Example Output
{'write_query': {'query': 'SELECT COUNT(*) as user_count FROM users;'}}
{'__interrupt__': ()}
Do you want to go to execute query? (yes/no): yes
{'execute_query': {'result': '[(10,)]'}}
{'generate_answer': {'answer': 'According to the SQL query result, the user count is 10.'}}
The advantages of this human intervention mechanism include:
-
• Security:Prevents unreviewed queries from being executed directly -
• Control:Users can intervene at critical nodes -
• Transparency:Clearly displays the execution results of each step -
• Flexibility:Allows adding interrupt points at different nodes as needed
6. Conclusion
This project demonstrates how to combine multiple powerful tools (LangChain, LangGraph, GPT-4) to build an intelligent and secure SQL query assistant. Through layered design and human-machine collaboration, we ensure both the automation level of the system and the safety of operations.
Key features:
-
• Natural language understanding and SQL generation -
• Type safety and error handling -
• Human-machine collaborative workflow -
• Scalable modular design
This solution can serve as a reference architecture for building other AI-driven database tools.
7. Complete Code
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from typing_extensions import TypedDict, Annotated
db = SQLDatabase.from_uri("sqlite:///test.db")
# Application State
class State(TypedDict):
question: str
query: str
result: str
answer: str
llm = ChatOpenAI(model="gpt-4o-mini")
query_prompt_template = hub.pull("langchain-ai/sql-query-system-prompt")
# Convert question to SQL query
class QueryOutput(TypedDict):
"""Generated SQL query."""
query: Annotated[str, ..., "Syntactically valid SQL query."]
def write_query(state: State):
"""Generate SQL query to fetch information."""
prompt = query_prompt_template.invoke(
{
"dialect": db.dialect,
"top_k": 10,
"table_info": db.get_table_info(),
"input": state["question"],
}
)
structured_llm = llm.with_structured_output(QueryOutput)
result = structured_llm.invoke(prompt)
return {"query": result["query"]}
# Execute query
def execute_query(state: State):
"""Execute SQL query."""
execute_query_tool = QuerySQLDataBaseTool(db=db)
return {"result": execute_query_tool.invoke(state["query"])}
# Generate answer
def generate_answer(state: State):
"""Answer question using retrieved information as context."""
prompt = (
"Given the following user question, corresponding SQL query, "
"and SQL result, answer the user question.\n\n"
f'Question: {state["question"]}\n'
f'SQL Query: {state["query"]}\n'
f'SQL Result: {state["result"]}'
)
response = llm.invoke(prompt)
return {"answer": response.content}
# Orchestration
memory = MemorySaver()
graph_builder = StateGraph(State).add_sequence(
[write_query, execute_query, generate_answer]
)
graph_builder.add_edge(START, "write_query")
# Human-machine collaboration
graph = graph_builder.compile(checkpointer=memory, interrupt_before=["execute_query"])
config = {"configurable": {"thread_id": "1"}}
for step in graph.stream(
{"question": "How many users are there?"},
config,
stream_mode="updates",
):
print(step)
try:
user_approval = input("Do you want to go to execute query? (yes/no): ")
except Exception:
user_approval = "no"
if user_approval.lower() == "yes":
# If approved, continue the graph execution
for step in graph.stream(None, config, stream_mode="updates"):
print(step)
else:
print("Operation cancelled by user.")
'''{'write_query': {'query': 'SELECT COUNT(*) as user_count FROM users;'}}
{'__interrupt__': ()}
Do you want to go to execute query? (yes/no): yes
{'execute_query': {'result': '[(10,)]'}}
{'generate_answer': {'answer': 'According to the SQL query result, the user count is 10.'}}'''
Recommended Reading
-
FastAPI Introduction Series Collection
-
Django Introduction Series Collection
-
Flask Tutorial Series Collection
-
tkinter Tutorial Series Collection
-
Flet Tutorial Series Collection
Please open in WeChat client