lesson-26

20 MIN READ | UPDATED: 2026-05-07

Welcome back, architects! I'm your old friend.

Over the past 25 episodes, we've been like a group of geeks coding away in a basement. We've meticulously crafted our "AI Content Agency," empowering the Planner to strategize, enabling the Researcher to scour the web for data, giving the Writer a brilliant pen, and equipping the Editor with a sharp eye for detail.

Watching the screen full of scrolling green text in the terminal, you might think: "Wow, this is so cool!"

But wake up, folks! Your boss, your clients, and your end-users are absolutely never going to open a terminal and type commands! What they need is a sleek webpage, an input box, a smooth loading animation, and finally—ding!—a perfectly crafted viral article appearing on their screen.

Starting with this episode, we are bringing our AI Agency out of the shadows. We will explore: how to wrap an extremely time-consuming "slow-thinking" engine like LangGraph in a standard, elegant Web API, allowing frontends (like React/Vue) to interact with it seamlessly.

Don't underestimate this. If you dare to directly call await graph.invoke() in FastAPI or Express, I guarantee your frontend will crash from HTTP timeouts, leaving your users staring at a blank screen questioning their life choices.

Ready? Today, we won't just discuss the theory; we're going to write the actual code.


🎯 Learning Objectives for This Episode

  1. Cognitive Upgrade: Understand the fundamental conflict between LangGraph's state machine and the traditional HTTP request-response model.
  2. Architecture Design: Master the frontend-backend separation architecture using "Asynchronous Task Dispatch + State Polling (Heartbeat)".
  3. Backend Practice: Build a standard LangGraph wrapper using FastAPI, perfectly binding the Task ID with LangGraph's Thread ID.
  4. Frontend Integration: Write the heartbeat polling logic in React (TypeScript) to achieve a silky-smooth user experience.

📖 Theory Breakdown

In traditional Web development, typical CRUD APIs are synchronous: the frontend sends a request -> the backend queries the database -> returns the result. The entire process usually completes within 200 milliseconds.

But what about our AI Content Agency? The Planner needs to brainstorm an outline (3 seconds), the Researcher needs to search Google and summarize (10 seconds), the Writer needs to draft a 2,000-word article (20 seconds), and the Editor needs to review and revise (10 seconds). A complete Graph execution might take anywhere from 40 seconds to a full minute!

If you use a traditional synchronous HTTP request: The frontend sends a POST request and just sits there waiting. The browser's default HTTP timeout is usually 30 to 60 seconds. If the network jitters even slightly, or the LLM API hiccups, the connection drops. The user sees a 504 Gateway Timeout, while your backend is still sweating away running the model, and the final result can never be returned to the frontend. This is a classic case of "money spent (tokens consumed), work done, but the customer left."

Solution: The Restaurant Order Number Model (Asynchronous Polling)

We need to introduce an "Asynchronous Task + Polling" mechanism. It's just like ordering food at KFC:

  1. You order a "Family Bucket" at the counter (submit the writing topic).
  2. The cashier doesn't make you stand at the counter waiting. Instead, they give you an order receipt (Task ID) and tell you, "Please have a seat and watch the screen." (The backend immediately returns a 202 Accepted).
  3. You (the frontend) look up at the big screen every few seconds (send a GET request to check the status).
  4. When the screen shows your order number, you take your receipt to pick up your food (fetch the final article).

When combined with LangGraph, we have a massive advantage: LangGraph natively supports Checkpointers (persistence), and its thread_id is naturally suited to be our Task ID!

Let's look at the flow chart for this architecture:

sequenceDiagram
    participant U as User (React UI)
    participant API as FastAPI (Backend Gate)
    participant BG as Background Task / Worker
    participant LG as LangGraph (AI Agency)
    participant DB as Checkpointer (SQLite/Redis)

    U->>API: 1. POST /api/agency/generate {topic: "Future of AI"}
    API->>DB: 2. Generate thread_id (Task ID)
    API->>BG: 3. Trigger background async Graph execution
    API-->>U: 4. Return 202 Accepted {task_id: "1234-5678"}
    
    rect rgb(240, 248, 255)
    Note over U, API: Frontend Heartbeat Polling
    loop Execute every 3 seconds
        U->>API: 5. GET /api/agency/status/1234-5678
        API->>DB: 6. Read latest State for this thread_id
        alt Task is running
            DB-->>API: State: {status: "Researcher gathering data..."}
            API-->>U: Return 200 {status: "running", data: null}
        else Task is completed
            DB-->>API: State: {status: "done", final_article: "..."}
            API-->>U: Return 200 {status: "completed", article: "..."}
        end
    end
    end
    
    %% Background Execution Flow
    BG->>LG: Async call graph.ainvoke(..., config={"configurable": {"thread_id": "1234-5678"}})
    LG->>DB: Real-time update of State at each step

See that? FastAPI is only responsible for greeting the user and checking the status. The real heavy lifting is handed off to background tasks, while LangGraph's Checkpointer serves as the bridge for state synchronization between the frontend and backend.


💻 Practical Code Walkthrough

To let you run this immediately, I will provide two pieces of code: the Python backend (FastAPI + LangGraph) and the TypeScript frontend (React).

1. Backend: FastAPI + LangGraph Wrapper

First, we need a mock Agency Graph. To keep the code from getting too bloated, I'll use a simplified StateGraph to represent our complex Planner/Researcher/Writer workflow, and introduce MemorySaver as the persistence layer.

Install dependencies:

pip install fastapi uvicorn langgraph langchain-openai pydantic

main.py core code:

import asyncio
import uuid
from typing import Dict, TypedDict, Any
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

# ==========================================
# 1. Define LangGraph State and Nodes (Simulating our AI Agency)
# ==========================================
class AgencyState(TypedDict):
    topic: str
    current_agent: str      # The agent currently working
    draft: str              # Draft
    final_article: str      # Final article
    status: str             # Task status: "running", "completed", "failed"

# Simulate Planner Node
async def planner_node(state: AgencyState):
    print(f"[Planner] Brainstorming outline for topic '{state['topic']}'...")
    await asyncio.sleep(2) # Simulate LLM thinking time
    return {"current_agent": "Planner", "draft": "Outline: 1. Background 2. Development 3. Conclusion"}

# Simulate Writer Node
async def writer_node(state: AgencyState):
    print(f"[Writer] Drafting based on the outline...")
    await asyncio.sleep(3) # Simulate LLM writing time
    return {"current_agent": "Writer", "draft": state["draft"] + "\nBody: AI is reshaping the world..."}

# Simulate Editor Node
async def editor_node(state: AgencyState):
    print(f"[Editor] Polishing the article...")
    await asyncio.sleep(2)
    final_text = state["draft"] + "\n[Proofreading complete, ready to publish]"
    return {"current_agent": "Editor", "final_article": final_text, "status": "completed"}

# Build the workflow graph
workflow = StateGraph(AgencyState)
workflow.add_node("planner", planner_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)

workflow.add_edge(START, "planner")
workflow.add_edge("planner", "writer")
workflow.add_edge("writer", "editor")
workflow.add_edge("editor", END)

# Core: Introduce Checkpointer so every step's state can be saved and queried!
memory = MemorySaver()
agency_graph = workflow.compile(checkpointer=memory)

# ==========================================
# 2. FastAPI Wrapper (Web API)
# ==========================================
app = FastAPI(title="AI Content Agency API")

# Allow CORS for frontend requests
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Request Model
class GenerateRequest(BaseModel):
    topic: str

# Async background task: Responsible for actually executing LangGraph
async def run_agency_graph_background(thread_id: str, topic: str):
    config = {"configurable": {"thread_id": thread_id}}
    initial_state = {
        "topic": topic,
        "current_agent": "Initializing",
        "draft": "",
        "final_article": "",
        "status": "running"
    }
    try:
        # Use ainvoke for async execution, won't block FastAPI's main thread
        # Note: Here we run it all at once. Because the checkpointer is configured,
        # LangGraph will automatically write each step's state into memory in the background.
        await agency_graph.ainvoke(initial_state, config=config)
    except Exception as e:
        print(f"Graph execution failed: {e}")
        # If an error occurs, we manually update the status to failed (in production, a dedicated error-handling node is recommended)
        agency_graph.update_state(config, {"status": "failed"})

@app.post("/api/v1/agency/generate", status_code=202)
async def start_generation(req: GenerateRequest, background_tasks: BackgroundTasks):
    """
    Frontend calls this endpoint to submit a task and get an order number (task_id)
    """
    # 1. Generate a unique Task ID (equivalent to LangGraph's Thread ID)
    task_id = str(uuid.uuid4())
    
    # 2. Throw the time-consuming Graph execution into a background task
    background_tasks.add_task(run_agency_graph_background, task_id, req.topic)
    
    # 3. Immediately return the Task ID to the frontend
    return {
        "message": "Task accepted. Please poll the status.",
        "task_id": task_id
    }

@app.get("/api/v1/agency/status/{task_id}")
async def get_status(task_id: str):
    """
    Frontend polls this endpoint via heartbeat to get the latest progress
    """
    config = {"configurable": {"thread_id": task_id}}
    
    # Read the latest State from the Checkpointer
    state_snapshot = agency_graph.get_state(config)
    
    # If no snapshot is found, the task hasn't started or the ID is wrong
    if not state_snapshot or not state_snapshot.values:
        raise HTTPException(status_code=404, detail="Task not found or not started yet.")
    
    current_state = state_snapshot.values
    
    # Construct friendly data to return to the frontend
    response = {
        "task_id": task_id,
        "status": current_state.get("status", "running"),
        "current_agent": current_state.get("current_agent", "Unknown"),
    }
    
    # If completed, return the final article
    if current_state.get("status") == "completed":
        response["final_article"] = current_state.get("final_article")
        
    return response

if __name__ == "__main__":
    import uvicorn
    # Run: python main.py
    uvicorn.run(app, host="0.0.0.0", port=8000)

2. Frontend: React Heartbeat Polling

Now, the backend is ready. The frontend no longer needs to sit and wait for a minute. Let's use TypeScript + React hooks to write an elegant polling mechanism.

// Frontend: React Component (AgencyClient.tsx)
import React, { useState, useEffect, useRef } from 'react';

// Define interface types
interface TaskResponse {
  task_id: string;
  status: 'running' | 'completed' | 'failed';
  current_agent: string;
  final_article?: string;
}

export const AgencyClient: React.FC = () => {
  const [topic, setTopic] = useState<string>('');
  const [taskId, setTaskId] = useState<string | null>(null);
  const [status, setStatus] = useState<string>('idle');
  const [agentMsg, setAgentMsg] = useState<string>('');
  const [article, setArticle] = useState<string>('');
  
  // Use ref to store the timer ID for easy cleanup
  const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null);

  // 1. Submit task (Place order)
  const handleGenerate = async () => {
    setStatus('submitting');
    setArticle('');
    try {
      const res = await fetch('http://localhost:8000/api/v1/agency/generate', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ topic })
      });
      const data = await res.json();
      setTaskId(data.task_id); // Got the order number!
      setStatus('running');
    } catch (error) {
      console.error("Submission failed", error);
      setStatus('failed');
    }
  };

  // 2. Listen for taskId changes, start heartbeat polling
  useEffect(() => {
    if (!taskId || status !== 'running') return;

    // Define polling function
    const pollStatus = async () => {
      try {
        const res = await fetch(`http://localhost:8000/api/v1/agency/status/${taskId}`);
        if (!res.ok) return;
        
        const data: TaskResponse = await res.json();
        
        // Update UI to show which Agent is currently working
        setAgentMsg(`Current processing node: ${data.current_agent}...`);

        if (data.status === 'completed') {
          // Task completed, stop polling, display article
          setStatus('completed');
          setArticle(data.final_article || '');
          if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
        } else if (data.status === 'failed') {
          setStatus('failed');
          if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
        }
      } catch (error) {
        console.error("Polling failed", error);
      }
    };

    // Check immediately once, then every 2 seconds (Heartbeat: 2000ms)
    pollStatus();
    pollingIntervalRef.current = setInterval(pollStatus, 2000);

    // Clean up timer on component unmount
    return () => {
      if (pollingIntervalRef.current) clearInterval(pollingIntervalRef.current);
    };
  }, [taskId, status]);

  return (
    <div style={{ padding: '20px', maxWidth: '600px', margin: '0 auto' }}>
      <h2>AI Content Agency Client</h2>
      
      <div style={{ display: 'flex', gap: '10px', marginBottom: '20px' }}>
        <input 
          type="text" 
          value={topic} 
          onChange={(e) => setTopic(e.target.value)} 
          placeholder="Enter article topic, e.g., Future of AI"
          disabled={status === 'running'}
          style={{ flex: 1, padding: '8px' }}
        />
        <button 
          onClick={handleGenerate} 
          disabled={!topic || status === 'running'}
        >
          {status === 'running' ? 'Creating...' : 'Start Creation'}
        </button>
      </div>

      {/* Status Display Area */}
      {status === 'running' && (
        <div style={{ color: 'blue', fontStyle: 'italic' }}>
          <p>⏳ Working hard on your draft, please wait...</p>
          <p>🤖 {agentMsg}</p>
        </div>
      )}

      {/* Result Display Area */}
      {status === 'completed' && (
        <div style={{ border: '1px solid #ccc', padding: '15px', borderRadius: '8px', backgroundColor: '#f9f9f9' }}>
          <h3>🎉 Creation Complete:</h3>
          <pre style={{ whiteSpace: 'pre-wrap', fontFamily: 'inherit' }}>{article}</pre>
        </div>
      )}
    </div>
  );
};

Pitfalls and How to Avoid Them

Folks, while the code above runs perfectly fine, there are a few massive pitfalls you must be aware of in a production environment. As a mentor with 10 years of experience stepping into these traps, I need to give you a heads-up:

🚨 Pitfall 1: Memory Explosion (The Hidden Danger of MemorySaver)

In the demo, we used MemorySaver() as the Checkpointer. This means the states of all tasks are stored in the memory of the FastAPI process. Consequence: If your website goes viral and you get 10,000 tasks a day, your server's memory will quickly max out. Once you restart the FastAPI service, the states of all currently running tasks will be lost entirely, and the frontend will only ever poll a 404! How to Avoid: In a production environment, absolutely never use MemorySaver. Please use AsyncSqliteSaver, AsyncPostgresSaver, or RedisSaver. Persist the state to a database. This way, you not only survive restarts but can also horizontally scale multiple FastAPI nodes.

🚨 Pitfall 2: The Blocking Crisis of BackgroundTasks

FastAPI's BackgroundTasks run in the same Event Loop by default. If a Node in your LangGraph contains synchronous blocking code (like using requests.get without async or CPU-intensive text processing), it will freeze the entire FastAPI app, causing the frontend's polling requests (GET status) to queue up and time out. How to Avoid:

  1. Ensure every node in your Graph is truly asynchronous (use async def and async HTTP clients like httpx).
  2. If you must run synchronous, time-consuming legacy code, use asyncio.to_thread() to execute it in a thread pool, or simply introduce Celery / Redis Queue (RQ) to completely decouple the tasks into independent Worker processes.

🚨 Pitfall 3: DDoSing Yourself (Polling Frequency Too High)

If the frontend's setInterval is set to 100 milliseconds, and 100 users place orders simultaneously, your backend will have to endure 1,000 queries per second. How to Avoid: Introduce an Exponential Backoff strategy. Start by polling every 2 seconds; if it's not done after 10 seconds, switch to every 5 seconds; after 30 seconds, switch to every 10 seconds. Don't blindly hammer the server with high-frequency requests.


📝 Summary for This Episode

Today, we successfully led the AI Content Agency out of the terminal's basement, dressed it in a FastAPI suit, and completed its first "handshake" with a React frontend.

We learned:

  1. LangGraph's long-running nature dictates that we must adopt an asynchronous flow with frontend-backend separation.
  2. The Order Number Model: Use POST to trigger the task and return a Task ID, and GET to poll the status.
  3. LangGraph's Killer Feature: By directly utilizing checkpointer and thread_id, we perfectly bind the business Task ID to the underlying state graph. The backend doesn't need to maintain an extra state dictionary at all!

Next Episode Teaser: While polling is useful, it's not "sexy" enough. Have you noticed that when ChatGPT generates content, it pops out word by word (the typewriter effect)? In Episode 27, we will take on an advanced challenge: ditching polling and introducing SSE (Server-Sent Events) with Streaming output, allowing your frontend to see every single word typed by the Writer Agent in real-time!

Architects, get today's code running, and I'll see you in the next episode! Don't forget to add Postgres persistence to your APIs!