lesson-12

20 MIN READ | UPDATED: 2026-05-07

Solving Multi-Node Concurrent Array Writes with operator.add and Custom Aggregation Functions

Welcome back to the LangGraph Multi-Agent Masterclass, future top-tier AI architects! I'm your host.

In our last episode, we successfully got our "AI Content Agency" up and running. Our Planner (Editor-in-Chief) was able to break down tasks, hand them over to the Researcher to gather information, and finally toss them to the Writer to draft the article.

However, I wonder if any of you encountered a blood-pressure-spiking paranormal event when running the code yourselves after class: "I clearly dispatched 3 Researchers to Google, Twitter, and Arxiv respectively to gather materials. Why is it that the final data passed to the Writer only contains the work of 1 person?! Did the other two's hard work just vanish into thin air?"

Congratulations, you've just stepped into the most classic pitfall in multi-agent concurrent state management—State Overwrite.

In today's lesson, I will unveil the most elegant, yet most easily overlooked black magic in LangGraph for beginners: Reducers. We will completely solve the problem of multi-node concurrent writes, teaching your agent team how to truly "collaborate" instead of "sabotaging" each other.


🎯 Learning Objectives for This Episode

By the end of today's class, you will have mastered the following hardcore skills:

  1. Understand the underlying logic of State updates: Figure out why LangGraph's default "Last Write Wins" mechanism leads to data loss.
  2. Master the use of built-in Reducers: Proficiently use typing.Annotated and operator.add to implement thread-safe incremental appending for lists.
  3. Write advanced custom Reducers: Hand-code aggregation functions that not only merge data but also perform data cleaning tasks like "deduplication" and "null-value filtering" during the merge.
  4. Refactor the Agency's concurrent search flow: Upgrade our Researcher into a multi-threaded concurrent search architecture, laying a solid data foundation for complex article drafting in the next steps.

📖 Theoretical Analysis

1. Why is your data getting lost? (The Overwrite Problem)

In LangGraph, the soul of the entire system is the State. You can think of the State as a "shared whiteboard" passed around among various Agents.

By default, when a node finishes executing and returns a dictionary, LangGraph takes this dictionary and updates the whiteboard. Its default update logic is extremely simple and brutal: Direct Overwrite.

Imagine this scenario: The Planner says: "Go look up the latest advancements in 'Quantum Computing'." So, the system concurrently launches three Researchers:

  • Researcher_Google finds A, returning {"research_materials": ["A"]}
  • Researcher_Arxiv finds B, returning {"research_materials": ["B"]}
  • Researcher_Twitter finds C, returning {"research_materials": ["C"]}

Because they are executing concurrently, whoever slaps their result on the whiteboard last is the only one whose result remains. If the Twitter node finishes 1 millisecond later, the value of research_materials will be overwritten as ["C"]. A and B are completely wiped out.

This is a classic Race Condition.

2. The Black Magic of Reducers: From "Overwrite" to "Merge"

To solve this problem, LangGraph introduced the concept of a Reducer. At its core, a Reducer is an interceptor/aggregator. It tells LangGraph: "Hey, when a node wants to update this field, don't just overwrite it. Please use the method I provide to mash the new data and the old data together."

In Python, we use typing.Annotated to bind a Reducer to a State field.

Let's take a look at the core workflow of the AI Agency we are going to refactor today:

graph TD
    classDef planner fill:#f9f0ff,stroke:#d8b4e2,stroke-width:2px;
    classDef researcher fill:#e6f7ff,stroke:#91d5ff,stroke-width:2px;
    classDef writer fill:#f6ffed,stroke:#b7eb8f,stroke-width:2px;
    classDef state fill:#fffb8f,stroke:#d4b106,stroke-width:2px,stroke-dasharray: 5 5;

    START((Start)) --> Planner[Planner Agent
Generates multiple search keywords]:::planner Planner --> |Fan-out Concurrent| R1[Researcher: Google Search]:::researcher Planner --> |Fan-out Concurrent| R2[Researcher: Paper Retrieval]:::researcher Planner --> |Fan-out Concurrent| R3[Researcher: Social Media]:::researcher R1 -.-> |Returns Result A| State_Materials[(State: research_materials
Reducer: operator.add)]:::state R2 -.-> |Returns Result B| State_Materials R3 -.-> |Returns Result C| State_Materials State_Materials -.-> |Fan-in Aggregation: A+B+C| Writer[Writer Agent
Drafts article based on all materials]:::writer Writer --> END((End))

In the architecture above, when the concurrent Researchers return their results, the research_materials field (equipped with a Reducer) acts like a reservoir, safely collecting A, B, and C, and finally feeding them to the Writer as a complete list.


💻 Practical Code Walkthrough

Enough talk, show me the code. We will use Python to implement this AI Agency workflow with concurrent Researchers.

Step 1: Environment and Basic Package Imports

import operator
from typing import Annotated, TypedDict, List, Any
from langgraph.graph import StateGraph, START, END

# Simulate LLM call latency
import time
import random

Step 2: Define the State with a Reducer (Core Focus!)

Pay attention! This is the core of today's lesson. We need to define an AgencyState.

# --- Custom Reducer Function ---
# Why not just use operator.add?
# Because operator.add will throw a TypeError if it encounters None + List!
# In real business scenarios, a Researcher might fail and return None, so we need more robust merge logic.
def safe_merge_materials(left: List[str], right: List[str] | None) -> List[str]:
    """
    Custom Reducer: Safely merges lists and includes deduplication.
    left: The existing data in the state
    right: The new data returned by the node
    """
    if not left:
        left = []
    if not right:
        return left
    
    # Merge and deduplicate while maintaining basic order (real deduplication might be more complex, this is a simple demo)
    merged = left + right
    # Use dict.fromkeys to deduplicate while preserving order
    return list(dict.fromkeys(merged))

# --- Define Graph State ---
class AgencyState(TypedDict):
    # Task topic (Standard field, defaults to overwrite)
    topic: str 
    
    # Specific search tasks assigned to Researchers (Standard field, overwrite)
    search_tasks: List[str] 
    
    # Collected materials (Magic field! Uses Annotated to bind our custom safe_merge_materials)
    # When any node returns {"research_materials": ["new data"]},
    # LangGraph automatically executes: new_state = safe_merge_materials(old_state, "new data")
    research_materials: Annotated[List[str], safe_merge_materials]
    
    # Final drafted article (Standard field, overwrite)
    final_article: str

Instructor's Commentary: Do you see the power of Annotated? Many beginners just write research_materials: List[str] and then can't figure out for the life of them why data gets lost during concurrency. Remember, in LangGraph, if you want incremental updates, you MUST use Annotated!

Step 3: Write Node Logic

Next, we implement the Planner, multiple Researchers, and the Writer. To help you see the execution order clearly, I've added detailed print statements in the code.

def planner_node(state: AgencyState):
    """
    Planner Node: Receives the main topic and breaks it down into multiple specific search tasks.
    """
    print(f"👨‍💼 [Planner] Breaking down topic: {state['topic']}")
    
    # Simulate the LLM thinking process, hardcoding the breakdown results directly
    tasks = [
        f"History of {state['topic']}",
        f"Latest technical breakthroughs in {state['topic']}",
        f"Commercial application cases of {state['topic']}"
    ]
    
    print(f"👨‍💼 [Planner] Breakdown complete, dispatching {len(tasks)} concurrent search tasks.")
    # Returning a standard overwrite field here
    return {"search_tasks": tasks}

def researcher_node(state: AgencyState, task: str):
    """
    Researcher Node: This is actually a node factory; we dynamically generate nodes based on tasks.
    Note: In a real LangGraph application, concurrency is usually done with dynamic routing via the Send API.
    For the sake of clarifying the Reducer concept in this episode, we use static concurrency for demonstration.
    """
    print(f"🔍 [Researcher] Starting search for subtask: {task}...")
    # Simulate network request latency
    time.sleep(random.uniform(0.5, 1.5))
    
    # Simulate searched data
    mock_result = f"In-depth report snippet on [{task}]"
    
    # Sometimes a search might fail (simulating empty data to test our safe_merge_materials)
    if random.random() < 0.1: 
        print(f"⚠️ [Researcher] Search failed for {task}, no materials found.")
        return {"research_materials": None}
        
    print(f"✅ [Researcher] Search complete: {task}")
    
    # Key point: Writing to the research_materials field.
    # Because it's bound to a Reducer, this list will be appended to the original list, not overwritten!
    return {"research_materials": [mock_result]}

def writer_node(state: AgencyState):
    """
    Writer Node: Collects all materials and writes the final article.
    """
    materials = state.get("research_materials", [])
    print(f"\n✍️ [Writer] Received {len(materials)} research materials, starting to draft...")
    
    if not materials:
        article = "Sorry, unable to draft the article due to a lack of materials."
    else:
        # Simulate the drafting process
        content = "\n".join([f"- {m}" for m in materials])
        article = f"《In-Depth Analysis of {state['topic']}》\n\nSynthesizing multiple sources, we have reached the following conclusions:\n{content}\n\n(End of Article)"
        
    print(f"✍️ [Writer] Drafting complete!")
    return {"final_article": article}

Step 4: Assemble the Graph and Handle Concurrency

Here is an advanced LangGraph trick: How do you make nodes execute in parallel? The answer is: From the same starting node, draw multiple edges to different nodes, and then converge them back into a single node.

def build_agency_graph():
    builder = StateGraph(AgencyState)
    
    # Add nodes
    builder.add_node("planner", planner_node)
    
    # To demonstrate concurrency, we manually create 3 fixed Researcher node instances
    # In a real scenario, you would use the Send() API for dynamic concurrency (we'll cover this in later lessons)
    builder.add_node("researcher_1", lambda state: researcher_node(state, state["search_tasks"][0]))
    builder.add_node("researcher_2", lambda state: researcher_node(state, state["search_tasks"][1]))
    builder.add_node("researcher_3", lambda state: researcher_node(state, state["search_tasks"][2]))
    
    builder.add_node("writer", writer_node)
    
    # Define control flow (Edges)
    builder.add_edge(START, "planner")
    
    # Fan-out: After the Planner finishes, trigger 3 Researchers simultaneously
    builder.add_edge("planner", "researcher_1")
    builder.add_edge("planner", "researcher_2")
    builder.add_edge("planner", "researcher_3")
    
    # Fan-in: Only trigger the Writer after all 3 Researchers have finished
    # Note: LangGraph automatically waits for all predecessor nodes to complete before executing the target node
    builder.add_edge(["researcher_1", "researcher_2", "researcher_3"], "writer")
    
    builder.add_edge("writer", END)
    
    return builder.compile()

# --- Mock Run ---
if __name__ == "__main__":
    app = build_agency_graph()
    
    print("🚀 Starting AI Content Agency workflow...\n" + "="*40)
    
    initial_state = {"topic": "2024 Room-Temperature Superconductivity"}
    
    # Run the entire graph
    final_state = app.invoke(initial_state)
    
    print("\n" + "="*40)
    print("🎉 Final Deliverable:\n")
    print(final_state["final_article"])
    print("="*40)

When you run this code, you will see output similar to this:

🚀 Starting AI Content Agency workflow...
========================================
👨‍💼 [Planner] Breaking down topic: 2024 Room-Temperature Superconductivity
👨‍💼 [Planner] Breakdown complete, dispatching 3 concurrent search tasks.
🔍 [Researcher] Starting search for subtask: History of 2024 Room-Temperature Superconductivity...
🔍 [Researcher] Starting search for subtask: Latest technical breakthroughs in 2024 Room-Temperature Superconductivity...
🔍 [Researcher] Starting search for subtask: Commercial application cases of 2024 Room-Temperature Superconductivity...
✅ [Researcher] Search complete: History of 2024 Room-Temperature Superconductivity
✅ [Researcher] Search complete: Commercial application cases of 2024 Room-Temperature Superconductivity
✅ [Researcher] Search complete: Latest technical breakthroughs in 2024 Room-Temperature Superconductivity

✍️ [Writer] Received 3 research materials, starting to draft...
✍️ [Writer] Drafting complete!

========================================
🎉 Final Deliverable:

《In-Depth Analysis of 2024 Room-Temperature Superconductivity》

Synthesizing multiple sources, we have reached the following conclusions:
- In-depth report snippet on [History of 2024 Room-Temperature Superconductivity]
- In-depth report snippet on [Commercial application cases of 2024 Room-Temperature Superconductivity]
- In-depth report snippet on [Latest technical breakthroughs in 2024 Room-Temperature Superconductivity]

(End of Article)
========================================

Look! 3 pieces of material, not a single one missing! This is the magic of Reducers.


坑与避坑指南 (Pitfalls and How to Avoid Them)

As your mentor, I absolutely cannot just teach you how to write Demos. In actual production environments, Reducers are the most common source of bugs. Below are 3 high-frequency pitfalls I've summarized. Make sure to jot these down in your notebook:

💣 Pitfall 1: Blindly Trusting operator.add

Many tutorials (including early official documentation) will teach you to just write Annotated[list, operator.add]. Danger! If a node returns {"research_materials": None} due to a network anomaly, LLM hallucination, or other reasons, operator.add(list, None) will directly throw a TypeError, causing the entire graph to crash. How to avoid: Always write a safe_merge wrapper function like I did in the code above to properly handle None cases.

💣 Pitfall 2: In-place Mutation in Reducers

If you write code like left.append(right) or left.extend(right) in your custom Reducer and then return left, you will encounter extremely bizarre phantom bugs in certain concurrent scenarios or when using Time Travel (history backtracking). How to avoid: A Reducer must be a Pure Function! Always return a new object. Using left + right creates a new list, which is safe; using append is dangerous.

💣 Pitfall 3: Thinking Reducers are Only for Concurrency

This is a common misconception. Reducers exist for more than just concurrency. Even if you have a linear flow (Node A -> Node B -> Node C), if you want each node to append logs or message records (like a messages list) to a list, you must use a Reducer. Without a Reducer, the data from the subsequent node will always overwrite the previous one.


📝 Episode Summary

Today, we took a deep dive into the underlying logic of State updates in LangGraph.

  1. We learned that without magic, State is just a ruthless "overwrite machine."
  2. We introduced Annotated and Reducers, granting State the ability to update incrementally.
  3. We hand-coded a safe list-merging function, perfectly solving the data loss issue caused by multiple Researchers gathering materials concurrently in our AI Agency.

Now, our AI Content Agency possesses powerful multi-threaded concurrent information gathering capabilities.

However, observant students might have noticed: our concurrency today is "static." That is, the Planner must rigidly assign 3 tasks to 3 hardcoded nodes. What if the Planner dynamically decides it needs to research 5 or 10 different directions?

In our next episode, 《Episode 13 | The Paradigm Shift of the Send API: How to Implement True Dynamic Concurrent Routing》, I will help you unlock LangGraph's ultimate concurrency weapon, giving your Agents true dynamic scaling capabilities!

Architects, keep coding, and I'll see you in the next episode! 🚀