lesson-11

20 MIN READ | UPDATED: 2026-05-07

Hello everyone, and welcome back to the LangGraph Multi-Agent Masterclass. I'm your instructor.

In our previous session, we successfully introduced a memory mechanism to our "AI Content Agency," giving our Agent context-awareness. However, as our agency receives increasingly complex orders and client demands grow, we've hit a severe performance bottleneck.

Imagine this scenario: A client requests a "2024 AI Industry Development Report for China, the US, and Japan." Under our previous linear workflow: The Planner breaks down the topic into three sub-topics (China, US, Japan) $\rightarrow$ The Researcher gathers data on China $\rightarrow$ Waits to finish, then researches the US $\rightarrow$ Then researches Japan $\rightarrow$ Finally hands it over to the Writer.

What is this called? This is Sequential Execution. If gathering data for each country takes 10 seconds, the Research phase alone consumes 30 seconds! In a real-world business environment, your clients will lose patience, and your computing resources are left underutilized.

How do top-tier content agencies actually operate? After breaking down the task, the Editor-in-Chief (Planner) simultaneously dispatches 3 Researchers. Alice researches China, Bob researches the US, and Charlie researches Japan. They split up, and whoever finishes tosses their findings into a shared pool. Finally, the Writer takes everything and drafts the article.

This brings us to today's topic: Parallel Execution and the Divide and Conquer mechanism. In distributed systems, we refer to this as Fan-out and Fan-in.

Ready to refactor our workflow? Let's dive in!


🎯 Learning Objectives for This Session

Through this hands-on session, you will master the following advanced skills:

  1. Understand the Fan-out & Fan-in Architecture: Grasp the principles of dynamic dispatch and result aggregation in multi-agent collaboration.
  2. Master the LangGraph Send API: Say goodbye to static conditional branches and learn how to dynamically create multiple node instances based on runtime data.
  3. Master Concurrent State Aggregation (State Reduction): Solve the data overwrite disaster that occurs when multiple nodes write to the State simultaneously.
  4. Refactor the Agency's Core Flow: Equip our content agency with a "multi-threaded" engine, boosting efficiency exponentially.

📖 Concept Breakdown

In LangGraph, there are two ways to implement parallelism. The first is static parallelism: one node connects directly to three fixed nodes. This is simple but lacks flexibility. In our Agency, the number of topics broken down by the Planner is dynamic (sometimes 2, sometimes 5).

Therefore, we need the second approach: Dynamic Parallelism.

LangGraph provides an incredibly elegant magic weapon for this: the Send object. When you return a list like [Send("node_name", payload), Send("node_name", payload)] from a node or a conditional edge, LangGraph will instantly clone the corresponding number of node_name nodes and feed them the different payloads concurrently.

It's just like the Planner standing in the middle of the office and shouting, "I need 3 Researchers!" Instantly, 3 Researchers spring into action, taking their respective sub-topics and getting to work.

Pay attention! Look at the Mermaid architecture diagram of our refactored Agency below:

graph TD
    Start((Start)) --> Planner[Planner Node\nEditor breaks down topics]
    
    Planner -- "Fan-out\nReturns [Send, Send, Send]" --> Dispatch{Dispatch Router}
    
    Dispatch -.->|Send payload 1| Researcher1[Researcher Instance 1\nAlice researches Topic A]
    Dispatch -.->|Send payload 2| Researcher2[Researcher Instance 2\nBob researches Topic B]
    Dispatch -.->|Send payload 3| Researcher3[Researcher Instance 3\nCharlie researches Topic C]
    
    Researcher1 -.->|Fan-in\nAppend to List| Aggregate((State Aggregation))
    Researcher2 -.->|Fan-in\nAppend to List| Aggregate
    Researcher3 -.->|Fan-in\nAppend to List| Aggregate
    
    Aggregate --> Writer[Writer Node\nWriter aggregates & drafts]
    Writer --> End((End))

    classDef core fill:#2d3436,stroke:#74b9ff,stroke-width:2px,color:#fff;
    classDef agent fill:#0984e3,stroke:#fff,stroke-width:2px,color:#fff;
    classDef router fill:#d63031,stroke:#fff,stroke-width:2px,color:#fff;
    
    class Planner,Writer core;
    class Researcher1,Researcher2,Researcher3 agent;
    class Dispatch router;

Workflow Breakdown:

  1. Planner determines the article outline and generates a sub_topics list.
  2. Dispatch Router iterates through this list, generates multiple Send objects, and triggers the Fan-out.
  3. Multiple Researcher instances run in parallel (LangGraph handles the concurrency under the hood).
  4. After each Researcher finishes, it writes its results to the State. The State here must use a specific Reducer (like operator.add) to achieve Fan-in aggregation; otherwise, the results of the last-finishing Researcher will overwrite the earlier ones.
  5. Only after all Researchers have completed their tasks will the flow proceed to the Writer.

💻 Hands-on Code Walkthrough

Enough talk, show me the code. To let you run and experience this directly, I'll use mocked LLM calls in this session's demo, using time.sleep to demonstrate the power of parallelism.

Step 1: Define a State that Supports Aggregation

This is the easiest place to stumble! When defining research_results in our State, we must use Annotated and operator.add. This tells LangGraph: "When multiple nodes write to this field at the same time, do not overwrite; instead, append them together!"

import operator
import time
from typing import Annotated, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

# ==========================================
# 1. State Definition
# ==========================================
class AgencyState(TypedDict):
    main_topic: str                  # The main topic provided by the client
    sub_topics: List[str]            # List of sub-topics broken down by the Planner
    # ⚠️ Core Concept: Use operator.add to ensure parallel writes are appended to the list, not overwritten
    research_results: Annotated[List[str], operator.add] 
    final_article: str               # The final generated article

# To pass specific parameters via Send, we define a dedicated State for the Researcher
class ResearcherState(TypedDict):
    sub_topic: str

Step 2: Write Node Logic (Nodes)

Next, let's implement the three core employees of our Agency: the Planner, the Researcher, and the Writer.

# ==========================================
# 2. Node Functions
# ==========================================

def planner_node(state: AgencyState):
    """
    The Planner: Breaks down the main topic into sub-topics.
    """
    print(f"👨‍💼 [Planner] 正在拆解主话题: '{state['main_topic']}'...")
    # Mock LLM breakdown logic
    mock_sub_topics = [
        f"{state['main_topic']} 的历史背景",
        f"{state['main_topic']} 的核心技术",
        f"{state['main_topic']} 的未来趋势"
    ]
    print(f"👨‍💼 [Planner] 拆解完成,生成了 {len(mock_sub_topics)} 个子话题。准备分发!")
    
    # Update the broken-down sub-topics into the State
    return {"sub_topics": mock_sub_topics}


def researcher_node(state: ResearcherState):
    """
    The Researcher: Conducts deep research on a SINGLE sub-topic.
    """
    topic = state["sub_topic"]
    print(f"   🔍 [Researcher] 收到任务,开始搜索: '{topic}'...")
    
    # Mock time-consuming web search and reading process (using sleep)
    # Note: In real LangGraph async execution, this blocking would be handled efficiently
    time.sleep(2) 
    
    mock_result = f"【关于 '{topic}' 的研究报告】:这是一些非常有价值的深度信息..."
    print(f"   ✅ [Researcher] 完成搜索: '{topic}'")
    
    # ⚠️ Core Concept: The key in the returned dict must match the field name to be aggregated in AgencyState
    # Because we used operator.add, the list returned here will be appended to the main list
    return {"research_results": [mock_result]}


def writer_node(state: AgencyState):
    """
    The Writer: Aggregates all research reports and writes the final article.
    """
    print(f"\n✍️ [Writer] 收到所有研究资料,开始疯狂码字...")
    results = state.get("research_results", [])
    
    print(f"✍️ [Writer] 共收到 {len(results)} 份研究报告。")
    
    # Mock writing process
    article = f"# {state['main_topic']} 深度解析\n\n"
    for i, res in enumerate(results, 1):
        article += f"## 章节 {i}\n{res}\n\n"
        
    print(f"✍️ [Writer] 完稿!")
    return {"final_article": article}

Step 3: The Magic of the Dispatch Router

This is the soul of the Fan-out. We don't connect the Planner and Researcher directly with an edge; instead, we use a conditional routing function that returns Send objects.

# ==========================================
# 3. Dynamic Dispatch Router
# ==========================================

def dispatch_researchers(state: AgencyState):
    """
    The Dispatcher: Dynamically creates Researcher instances based on sub_topics.
    """
    sub_topics = state.get("sub_topics", [])
    
    # ⚠️ Core Concept: Create a Send object for each sub-topic
    # Send("target_node_name", State_Payload_passed_to_that_node)
    sends = []
    for topic in sub_topics:
        sends.append(Send("researcher", {"sub_topic": topic}))
        
    return sends

Step 4: Build & Run

# ==========================================
# 4. Build the Graph
# ==========================================
builder = StateGraph(AgencyState)

# Add nodes
builder.add_node("planner", planner_node)
builder.add_node("researcher", researcher_node)
builder.add_node("writer", writer_node)

# Set control flow
builder.add_edge(START, "planner")

# Trigger dynamic dispatch (Fan-out) after Planner finishes
builder.add_conditional_edges(
    "planner", 
    dispatch_researchers, 
    # Declare the nodes that Send might route to
    ["researcher"] 
)

# After all researcher nodes finish, flow uniformly to the writer (Fan-in)
builder.add_edge("researcher", "writer")
builder.add_edge("writer", END)

# Compile the graph
graph = builder.compile()

# ==========================================
# 5. Test Run
# ==========================================
if __name__ == "__main__":
    print("🚀 启动 AI Content Agency 并行工作流...\n")
    
    initial_state = {
        "main_topic": "量子计算",
        "research_results": [] # Initialize as an empty list
    }
    
    # Record start time
    start_time = time.time()
    
    # Run the Graph
    final_state = graph.invoke(initial_state)
    
    # Record end time
    end_time = time.time()
    
    print("\n" + "="*40)
    print("🏆 最终产出文章预览:")
    print("="*40)
    print(final_state["final_article"])
    print("="*40)
    print(f"⏱️ 总耗时: {end_time - start_time:.2f} 秒")
    # If sequential, 3 topics sleeping for 2s each would take at least 6 seconds.
    # Because it's parallel, the total time should be just over 2 seconds!

When you run this code, you will see the following beautiful output printed in your console:

🚀 启动 AI Content Agency 并行工作流...

👨‍💼 [Planner] 正在拆解主话题: '量子计算'...
👨‍💼 [Planner] 拆解完成,生成了 3 个子话题。准备分发!
   🔍 [Researcher] 收到任务,开始搜索: '量子计算 的历史背景'...
   🔍 [Researcher] 收到任务,开始搜索: '量子计算 的核心技术'...
   🔍 [Researcher] 收到任务,开始搜索: '量子计算 的未来趋势'...
   ✅ [Researcher] 完成搜索: '量子计算 的历史背景'
   ✅ [Researcher] 完成搜索: '量子计算 的未来趋势'
   ✅ [Researcher] 完成搜索: '量子计算 的核心技术'

✍️ [Writer] 收到所有研究资料,开始疯狂码字...
✍️ [Writer] 共收到 3 份研究报告。
✍️ [Writer] 完稿!

========================================
🏆 最终产出文章预览:
========================================
# 量子计算 深度解析

## 章节 1
【关于 '量子计算 的历史背景' 的研究报告】:这是一些非常有价值的深度信息...

## 章节 2
【关于 '量子计算 的核心技术' 的研究报告】:这是一些非常有价值的深度信息...

## 章节 3
【关于 '量子计算 的未来趋势' 的研究报告】:这是一些非常有价值的深度信息...

========================================
⏱️ 总耗时: 2.03 秒

Did you see the final execution time, everyone? 2.03 seconds! 3 tasks taking 2 seconds each were executed concurrently, and the total time is still just 2 seconds. This is the absolute power of parallel divide-and-conquer. Your Agency's throughput has now reached industrial-grade levels.


Pitfalls & How to Avoid Them

As your instructor, I must not only teach you how to write working code but also warn you about where you might bleed in a production environment. When using parallel mechanisms, there are three "massive pitfalls" you must guard against:

💣 Pitfall 1: Forgetting to Use a Reducer, Leading to Ruthless State Overwrites

Symptoms: Dispatched 5 Researchers, but the Writer only received 1 report at the end. Diagnosis: When defining research_results: list[str] in the TypedDict, you forgot to add Annotated[..., operator.add]. LangGraph's default behavior is Overwrite. 5 parallel nodes return {"research_results": [...]} simultaneously, and the last node to finish wipes out the hard work of the previous 4. Cure: Always remember to add a Reducer to the target field of a Fan-in operation.

💣 Pitfall 2: Concurrency Storms Triggering API Rate Limits (429 Error)

Symptoms: The Planner breaks down 20 sub-topics and instantly dispatches 20 Researcher requests to the OpenAI API, immediately getting rejected with an HTTP 429 Too Many Requests error. Diagnosis: The Fan-out is too aggressive and lacks concurrency control. Cure: In a production environment, if you don't know how many tasks the Planner will generate, never run it unprotected. You should implement a Retry with Exponential Backoff mechanism in your underlying LLM call logic (e.g., using the tenacity library), or limit the maximum concurrency (max_concurrency) within LangGraph's RunnableConfig.

💣 Pitfall 3: The Wooden Barrel Effect and Deadlock Risks (The Slowest Intern Effect)

Symptoms: The total execution time is unusually long. Diagnosis: A characteristic of the Fan-in mechanism is that it must wait for all dispatched Send nodes to finish executing before moving to the next node (the Writer). If Alice and Bob finish their research in 2 seconds, but Charlie experiences network lag and gets stuck for 60 seconds, the entire system has to wait 60 seconds for Charlie. Cure: Set strict Timeouts for the LLM calls or search tools in your Researcher nodes. If a Researcher times out, catch the exception and return a fallback result like "No data available for this topic at the moment." Never let the entire Graph hang indefinitely.


📝 Summary for This Session

Today, we equipped the "AI Content Agency" with a turbocharged parallel processing engine.

We learned:

  1. Fan-out: Dynamically cloning nodes by returning a list of Send objects via conditional edges.
  2. Fan-in: Safely aggregating data generated by concurrent nodes using Annotated and operator.add.
  3. Performance Leap: Compressing what was originally an $O(N)$ time complexity down to approximately $O(1)$, resources permitting.

Now, our Planner strategizes, our Researchers run in parallel, and our Writer drafts like magic. However, if you look closely at the generated articles, you'll realize: if the data the Researchers bring back is garbage, the Writer will produce garbage (Garbage in, garbage out).

Isn't the agency missing a gatekeeper? Exactly! In our next session (Part 12), we will introduce the Human-in-the-loop mechanism. Before the Writer starts drafting, or after the draft is submitted, we will allow a real-world Editor-in-Chief (you) to intervene in the workflow to approve, reject, or modify the content.

Let the Agents run wild, but keep the steering wheel in human hands. See you in the next session! 👋