Welcome back, architects! I'm your old friend.
Over the past 25 episodes, we've been like a group of geeks coding away in a basement. We've meticulously crafted our "AI Content Agency," empowering the Planner to strategize, enabling the Researcher to scour the web for data, giving the Writer a brilliant pen, and equipping the Editor with a sharp eye for detail.
Watching the screen full of scrolling green text in the terminal, you might think: "Wow, this is so cool!"
But wake up, folks! Your boss, your clients, and your end-users are absolutely never going to open a terminal and type commands! What they need is a sleek webpage, an input box, a smooth loading animation, and finally—ding!—a perfectly crafted viral article appearing on their screen.
Starting with this episode, we are bringing our AI Agency out of the shadows. We will explore: how to wrap an extremely time-consuming "slow-thinking" engine like LangGraph in a standard, elegant Web API, allowing frontends (like React/Vue) to interact with it seamlessly.
Don't underestimate this. If you dare to directly call await graph.invoke() in FastAPI or Express, I guarantee your frontend will crash from HTTP timeouts, leaving your users staring at a blank screen questioning their life choices.
Ready? Today, we won't just discuss the theory; we're going to write the actual code.
🎯 Learning Objectives for This Episode
- Cognitive Upgrade: Understand the fundamental conflict between LangGraph's state machine and the traditional HTTP request-response model.
- Architecture Design: Master the frontend-backend separation architecture using "Asynchronous Task Dispatch + State Polling (Heartbeat)".
- Backend Practice: Build a standard LangGraph wrapper using FastAPI, perfectly binding the Task ID with LangGraph's Thread ID.
- Frontend Integration: Write the heartbeat polling logic in React (TypeScript) to achieve a silky-smooth user experience.
📖 Theory Breakdown
In traditional Web development, typical CRUD APIs are synchronous: the frontend sends a request -> the backend queries the database -> returns the result. The entire process usually completes within 200 milliseconds.
But what about our AI Content Agency? The Planner needs to brainstorm an outline (3 seconds), the Researcher needs to search Google and summarize (10 seconds), the Writer needs to draft a 2,000-word article (20 seconds), and the Editor needs to review and revise (10 seconds). A complete Graph execution might take anywhere from 40 seconds to a full minute!
If you use a traditional synchronous HTTP request:
The frontend sends a POST request and just sits there waiting. The browser's default HTTP timeout is usually 30 to 60 seconds. If the network jitters even slightly, or the LLM API hiccups, the connection drops. The user sees a 504 Gateway Timeout, while your backend is still sweating away running the model, and the final result can never be returned to the frontend. This is a classic case of "money spent (tokens consumed), work done, but the customer left."
Solution: The Restaurant Order Number Model (Asynchronous Polling)
We need to introduce an "Asynchronous Task + Polling" mechanism. It's just like ordering food at KFC:
- You order a "Family Bucket" at the counter (submit the writing topic).
- The cashier doesn't make you stand at the counter waiting. Instead, they give you an order receipt (Task ID) and tell you, "Please have a seat and watch the screen." (The backend immediately returns a
202 Accepted). - You (the frontend) look up at the big screen every few seconds (send a GET request to check the status).
- When the screen shows your order number, you take your receipt to pick up your food (fetch the final article).
When combined with LangGraph, we have a massive advantage: LangGraph natively supports Checkpointers (persistence), and its thread_id is naturally suited to be our Task ID!
Let's look at the flow chart for this architecture:
sequenceDiagram
participant U as User (React UI)
participant API as FastAPI (Backend Gate)
participant BG as Background Task / Worker
participant LG as LangGraph (AI Agency)
participant DB as Checkpointer (SQLite/Redis)
U->>API: 1. POST /api/agency/generate {topic: "Future of AI"}
API->>DB: 2. Generate thread_id (Task ID)
API->>BG: 3. Trigger background async Graph execution
API-->>U: 4. Return 202 Accepted {task_id: "1234-5678"}
rect rgb(240, 248, 255)
Note over U, API: Frontend Heartbeat Polling
loop Execute every 3 seconds
U->>API: 5. GET /api/agency/status/1234-5678
API->>DB: 6. Read latest State for this thread_id
alt Task is running
DB-->>API: State: {status: "Researcher gathering data..."}
API-->>U: Return 200 {status: "running", data: null}
else Task is completed
DB-->>API: State: {status: "done", final_article: "..."}
API-->>U: Return 200 {status: "completed", article: "..."}
end
end
end
%% Background Execution Flow
BG->>LG: Async call graph.ainvoke(..., config={"configurable": {"thread_id": "1234-5678"}})
LG->>DB: Real-time update of State at each stepSee that? FastAPI is only responsible for greeting the user and checking the status. The real heavy lifting is handed off to background tasks, while LangGraph's Checkpointer serves as the bridge for state synchronization between the frontend and backend.
💻 Practical Code Walkthrough
To let you run this immediately, I will provide two pieces of code: the Python backend (FastAPI + LangGraph) and the TypeScript frontend (React).
1. Backend: FastAPI + LangGraph Wrapper
First, we need a mock Agency Graph. To keep the code from getting too bloated, I'll use a simplified StateGraph to represent our complex Planner/Researcher/Writer workflow, and introduce MemorySaver as the persistence layer.
Install dependencies:
pip install fastapi uvicorn langgraph langchain-openai pydantic
main.py core code:
import asyncio
import uuid
from typing import Dict, TypedDict, Any
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
# ==========================================
# 1. Define LangGraph State and Nodes (Simulating our AI Agency)
# ==========================================
class AgencyState(TypedDict):
topic: str
current_agent: str # The agent currently working
draft: str # Draft
final_article: str # Final article
status: str # Task status: "running", "completed", "failed"
# Simulate Planner Node
async def planner_node(state: AgencyState):
print(f"[Planner] Brainstorming outline for topic '{state['topic']}'...")
await asyncio.sleep(2) # Simulate LLM thinking time
return {"current_agent": "Planner", "draft": "Outline: 1. Background 2. Development 3. Conclusion"}
# Simulate Writer Node
async def writer_node(state: AgencyState):
print(f"[Writer] Drafting based on the outline...")
await asyncio.sleep(3) # Simulate LLM writing time
return {"current_agent": "Writer", "draft": state["draft"] + "\nBody: AI is reshaping the world..."}
# Simulate Editor Node
async def editor_node(state: AgencyState):
print(f"[Editor] Polishing the article...")
await asyncio.sleep(2)
final_text = state["draft"] + "\n[Proofreading complete, ready to publish]"
return {"current_agent": "Editor", "final_article": final_text, "status": "completed"}
# Build the workflow graph
workflow = StateGraph(AgencyState)
workflow.add_node("planner", planner_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)
workflow.add_edge(START, "planner")
workflow.add_edge("planner", "writer")
workflow.add_edge("writer", "editor")
workflow.add_edge("editor", END)
# Core: Introduce Checkpointer so every step's state can be saved and queried!
memory = MemorySaver()
agency_graph = workflow.compile(checkpointer=memory)
# ==========================================
# 2. FastAPI Wrapper (Web API)
# ==========================================
app = FastAPI(title="AI Content Agency API")
# Allow CORS for frontend requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Request Model
class GenerateRequest(BaseModel):
topic: str
# Async background task: Responsible for actually executing LangGraph
async def run_agency_graph_background(thread_id: str, topic: str):
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"topic": topic,
"current_agent": "Initializing",
"draft": "",
"final_article": "",
"status": "running"
}
try:
# Use ainvoke for async execution, won't block FastAPI's main thread
# Note: Here we run it all at once. Because the checkpointer is configured,
# LangGraph will automatically write each step's state into memory in the background.
await agency_graph.ainvoke(initial_state, config=config)
except Exception as e:
print(f"Graph execution failed: {e}")
# If an error occurs, we manually update the status to failed (in production, a dedicated error-handling node is recommended)
agency_graph.update_state(config, {"status": "failed"})
@app.post("/api/v1/agency/generate", status_code=202)
async def start_generation(req: GenerateRequest, background_tasks: BackgroundTasks):
"""
Frontend calls this endpoint to submit a task and get an order number (task_id)
"""
# 1. Generate a unique Task ID (equivalent to LangGraph's Thread ID)
task_id = str(uuid.uuid4())
# 2. Throw the time-consuming Graph execution into a background task
background_tasks.add_task(run_agency_graph_background, task_id, req.topic)
# 3. Immediately return the Task ID to the frontend
return {
"message": "Task accepted. Please poll the status.",
"task_id": task_id
}
@app.get("/api/v1/agency/status/{task_id}")
async def get_status(task_id: str):
"""
Frontend polls this endpoint via heartbeat to get the latest progress
"""
config = {"configurable": {"thread_id": task_id}}
# Read the latest State from the Checkpointer
state_snapshot = agency_graph.get_state(config)
# If no snapshot is found, the task hasn't started or the ID is wrong
if not state_snapshot or not state_snapshot.values:
raise HTTPException(status_code=404, detail="Task not found or not started yet.")
current_state = state_snapshot.values
# Construct friendly data to return to the frontend
response = {
"task_id": task_id,
"status": current_state.get("status", "running"),
"current_agent": current_state.get("current_agent", "Unknown"),
}
# If completed, return the final article
if current_state.get("status") == "completed":
response["final_article"] = current_state.get("final_article")
return response
if __name__ == "__main__":
import uvicorn
# Run: python main.py
uvicorn.run(app, host="0.0.0.0", port=8000)
2. Frontend: React Heartbeat Polling
Now, the backend is ready. The frontend no longer needs to sit and wait for a minute. Let's use TypeScript + React hooks to write an elegant polling mechanism.
// Frontend: React Component (AgencyClient.tsx)
import React, { useState, useEffect, useRef } from 'react';
// Define interface types
interface TaskResponse {
task_id: string;
status: 'running' | 'completed' | 'failed';
current_agent: string;
final_article?: string;
}
export const AgencyClient: React.FC = () => {
const [topic, setTopic] = useState<string>('');
const [taskId, setTaskId] = useState<string | null>(null);
const [status, setStatus] = useState<string>('idle');
const [agentMsg, setAgentMsg] = useState<string>('');
const [article, setArticle] = useState<string>('');
// Use ref to store the timer ID for easy cleanup
const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null);
// 1. Submit task (Place order)
const handleGenerate = async () => {
setStatus('submitting');
setArticle('');
try {
const res = await fetch('http://localhost:8000/api/v1/agency/generate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topic })
});
const data = await res.json();
setTaskId(data.task_id); // Got the order number!
setStatus('running');
} catch (error) {
console.error("Submission failed", error);
setStatus('failed');
}
};
// 2. Listen for taskId changes, start heartbeat polling
useEffect(() => {
if (!taskId || status !== 'running') return;
// Define polling function
const pollStatus = async () => {
try {
const res = await fetch(`http://localhost:8000/api/v1/agency/status/${taskId}`);
if (!res.ok) return;
const data: TaskResponse = await res.json();
// Update UI to show which Agent is currently working
setAgentMsg(`Current processing node: ${data.current_agent}...`);
if (data.status === 'completed') {
// Task completed, stop polling, display article
setStatus('completed');
setArticle(data.final_article || '');
if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
} else if (data.status === 'failed') {
setStatus('failed');
if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
}
} catch (error) {
console.error("Polling failed", error);
}
};
// Check immediately once, then every 2 seconds (Heartbeat: 2000ms)
pollStatus();
pollingIntervalRef.current = setInterval(pollStatus, 2000);
// Clean up timer on component unmount
return () => {
if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
};
}, [taskId, status]);
return (
<div style={{ padding: '20px', maxWidth: '600px', margin: '0 auto' }}>
<h2>AI Content Agency Client</h2>
<div style={{ display: 'flex', gap: '10px', marginBottom: '20px' }}>
<input
type="text"
value={topic}
onChange={(e) => setTopic(e.target.value)}
placeholder="Enter article topic, e.g., Future of AI"
disabled={status === 'running'}
style={{ flex: 1, padding: '8px' }}
/>
<button
onClick={handleGenerate}
disabled={!topic || status === 'running'}
>
{status === 'running' ? 'Creating...' : 'Start Creation'}
</button>
</div>
{/* Status Display Area */}
{status === 'running' && (
<div style={{ color: 'blue', fontStyle: 'italic' }}>
<p>⏳ Working hard on your draft, please wait...</p>
<p>🤖 {agentMsg}</p>
</div>
)}
{/* Result Display Area */}
{status === 'completed' && (
<div style={{ border: '1px solid #ccc', padding: '15px', borderRadius: '8px', backgroundColor: '#f9f9f9' }}>
<h3>🎉 Creation Complete:</h3>
<pre style={{ whiteSpace: 'pre-wrap', fontFamily: 'inherit' }}>{article}</pre>
</div>
)}
</div>
);
};
Pitfalls and How to Avoid Them
Folks, while the code above runs perfectly fine, there are a few massive pitfalls you must be aware of in a production environment. As a mentor with 10 years of experience stepping into these traps, I need to give you a heads-up:
🚨 Pitfall 1: Memory Explosion (The Hidden Danger of MemorySaver)
In the demo, we used MemorySaver() as the Checkpointer. This means the states of all tasks are stored in the memory of the FastAPI process.
Consequence: If your website goes viral and you get 10,000 tasks a day, your server's memory will quickly max out. Once you restart the FastAPI service, the states of all currently running tasks will be lost entirely, and the frontend will only ever poll a 404!
How to Avoid: In a production environment, absolutely never use MemorySaver. Please use AsyncSqliteSaver, AsyncPostgresSaver, or RedisSaver. Persist the state to a database. This way, you not only survive restarts but can also horizontally scale multiple FastAPI nodes.
🚨 Pitfall 2: The Blocking Crisis of BackgroundTasks
FastAPI's BackgroundTasks run in the same Event Loop by default. If a Node in your LangGraph contains synchronous blocking code (like using requests.get without async or CPU-intensive text processing), it will freeze the entire FastAPI app, causing the frontend's polling requests (GET status) to queue up and time out.
How to Avoid:
- Ensure every node in your Graph is truly asynchronous (use
async defand async HTTP clients likehttpx). - If you must run synchronous, time-consuming legacy code, use
asyncio.to_thread()to execute it in a thread pool, or simply introduceCelery/Redis Queue (RQ)to completely decouple the tasks into independent Worker processes.
🚨 Pitfall 3: DDoSing Yourself (Polling Frequency Too High)
If the frontend's setInterval is set to 100 milliseconds, and 100 users place orders simultaneously, your backend will have to endure 1,000 queries per second.
How to Avoid:
Introduce an Exponential Backoff strategy. Start by polling every 2 seconds; if it's not done after 10 seconds, switch to every 5 seconds; after 30 seconds, switch to every 10 seconds. Don't blindly hammer the server with high-frequency requests.
📝 Summary for This Episode
Today, we successfully led the AI Content Agency out of the terminal's basement, dressed it in a FastAPI suit, and completed its first "handshake" with a React frontend.
We learned:
- LangGraph's long-running nature dictates that we must adopt an asynchronous flow with frontend-backend separation.
- The Order Number Model: Use POST to trigger the task and return a Task ID, and GET to poll the status.
- LangGraph's Killer Feature: By directly utilizing
checkpointerandthread_id, we perfectly bind the business Task ID to the underlying state graph. The backend doesn't need to maintain an extra state dictionary at all!
Next Episode Teaser: While polling is useful, it's not "sexy" enough. Have you noticed that when ChatGPT generates content, it pops out word by word (the typewriter effect)? In Episode 27, we will take on an advanced challenge: ditching polling and introducing SSE (Server-Sent Events) with Streaming output, allowing your frontend to see every single word typed by the Writer Agent in real-time!
Architects, get today's code running, and I'll see you in the next episode! Don't forget to add Postgres persistence to your APIs!