Skip to content

Reducers

Incremental state accumulation for values that are expensive to recompute from the event log each round.

When to reach for a reducer: Pure event-driven handlers (log.filter(), log.latest()) are the default and work for most patterns. Add a Reducer when you need incremental accumulation that would be expensive to recompute from the full log each round — the canonical case is message_reducer() for LLM conversation history. Add a ScalarReducer for last-write-wins configuration values injected directly into handlers. If you find yourself calling log.filter(X) and transforming the result the same way in multiple handlers, that's a signal a reducer would help.

Reducer

A Reducer maps events to contributions for a named LangGraph state channel. The framework maintains the channel incrementally — handlers receive the accumulated value by declaring a parameter whose name matches the reducer.

from langgraph_events import Reducer, ScalarReducer, message_reducer, EventGraph, on

# --- Reducer: accumulates contributions from matching events ---
history = Reducer("history", event_type=UserMsg, fn=lambda e: [e.text], default=[])


@on(UserMsg)
def respond(event: UserMsg, history: list) -> Reply:
    # history contains all projected values so far
    ...


graph = EventGraph([respond], reducers=[history])

message_reducer

Built-in reducer for LangChain message accumulation. Projects MessageEvent.as_messages() into a messages state channel with smart deduplication via LangGraph's add_messages.

messages = message_reducer()
graph = EventGraph([call_llm, handle_tools], reducers=[messages])
log = graph.invoke([
    SystemPromptSet.from_str("You are a helpful assistant."),
    UserMessageReceived(message=HumanMessage(content="Hi")),
])

# Alternative: explicit default list
messages = message_reducer([SystemMessage(content="You are a helpful assistant.")])

The parameter name messages matches the reducer name, so the framework injects the accumulated message list automatically:

@on(UserMessageReceived, ToolsExecuted)
async def call_llm(event: Event, messages: list[BaseMessage]) -> LLMResponded:
    response = await llm.ainvoke(messages)
    ...

See the ReAct Agent pattern for message_reducer() in action, and the Supervisor pattern for a custom Reducer.

ScalarReducer

Last-write-wins reducer for single values. Unlike Reducer (which accumulates lists), ScalarReducer injects a bare value — the most recent non-SKIP contribution wins.

temperature = ScalarReducer(
    "temperature", event_type=TempSet, fn=lambda e: e.value, default=0.7
)

SKIP

When a ScalarReducer function returns SKIP, the reducer value is left unchanged. This distinguishes "set to None" from "don't update."

from langgraph_events import SKIP, ScalarReducer

temperature = ScalarReducer(
    "temperature", event_type=ConfigUpdated, fn=lambda e: e.temp if e.temp is not None else SKIP, default=0.7
)