Skip to content

Core Concepts

Events

Events are frozen dataclasses that extend Event. Immutability guarantees a safe append-only log. Subclasses are automatically made into frozen dataclasses — no decorator needed:

from langgraph_events import Event


class OrderPlaced(Event):
    order_id: str
    total: float

Events support inheritance. A handler subscribed to a parent type fires for all subtypes (isinstance matching). The built-in Auditable marker class is a common example — subscribe once with @on(Auditable) and every marked event is captured automatically:

from langgraph_events import Auditable, on


class OrderPlaced(Auditable):
    order_id: str


class OrderShipped(Auditable):
    order_id: str


@on(Auditable)
def audit(event: Auditable) -> None:
    # Fires for OrderPlaced, OrderShipped, and any Auditable subtype
    print(event.trail())

@on(*EventTypes)

Decorate a function with @on(EventType) to subscribe it. Handlers receive the matching event and optionally an EventLog. They return a single Event, None (side-effect only), or Scatter.

@on(UserMessage)
def greet(event: UserMessage) -> Greeting:
    return Greeting(text=f"Hello!")

Handlers may also request config: RunnableConfig or store: BaseStore by type annotation (reducer channels are still injected by parameter name).

Multi-subscription — a single handler fires on multiple event types:

@on(UserMessage, ToolResult)
def call_llm(event: Event, log: EventLog) -> AssistantMessage:
    history = log.filter(Event)
    ...

EventGraph

The main entry point. Pass a list of handler functions and EventGraph derives the topology.

graph = EventGraph(
    [classify, respond, audit],
    max_rounds=50,           # default: 100; prevents infinite loops
    reducers=[my_reducer],   # optional — see Reducer section
)

max_rounds (default: 100) prevents infinite loops — the library auto-sets LangGraph's recursion_limit so this is the only knob you need. Exceeding the limit emits a MaxRoundsExceeded event (a Halted subclass) instead of raising, so checkpointed state stays clean and the graph can be retried.

Visualizing the Event Flow

graph.mermaid() returns a Mermaid flowchart showing how events correlate through handlers. Events are nodes, handler names are edge labels, and side-effect handlers (returning None) are listed in a footer comment.

print(graph.mermaid())
graph LR
    classDef entry fill:none,stroke:none,color:none
    _e0_[ ]:::entry ==> MessageReceived
    MessageReceived -->|classify| MessageClassified
    MessageClassified -->|respond| ReplyProduced

LangGraph Escape Hatch

Access the underlying CompiledStateGraph for advanced patterns — subgraph composition, custom streaming modes, or direct state access:

compiled = graph.compiled
for chunk in compiled.stream({"events": [SeedEvent(...)]}, stream_mode="updates"):
    print(chunk)

EventLog

Immutable, ordered container returned by invoke/ainvoke. Handlers can also receive it as a second parameter.

@on(DraftProduced)
def evaluate(event: DraftProduced, log: EventLog) -> CritiqueReceived | FinalDraftProduced:
    request = log.latest(WriteRequested)        # most recent event of this type
    all_drafts = log.filter(DraftProduced)      # all events matching this type
    if log.has(CritiqueReceived):               # boolean check
        ...
Method Returns Description
log.filter(T) list[T] All events of type T
log.latest(T) T \| None Most recent event of type T
log.first(T) T \| None Earliest event of type T
log.has(T) bool Whether any event of type T exists
log.count(T) int Number of events matching type T
log.select(T) EventLog Filtered log (chainable)
log.after(T) EventLog Events after first occurrence of T
log.before(T) EventLog Events before first occurrence of T
len(log) int Total events
log[i] Event Index access

Halted

Return a Halted event (or subclass) from any handler to immediately stop the graph. No further handlers are dispatched. Subclass Halted with domain-specific fields instead of generic payloads:

class ContentBlocked(Halted):
    label: str

@on(Classified)
def guard(event: Classified) -> Reply | ContentBlocked:
    if event.label == "blocked":
        return ContentBlocked(label=event.label)
    return Reply(text="OK")

Built-in subtypes:

Subtype Emitted when
MaxRoundsExceeded Graph exceeds max_rounds (has rounds: int field)
Cancelled Async handler execution was cancelled

Scatter

Return Scatter([event1, event2, ...]) to fan-out into multiple events. Each becomes a separate pending event, dispatched in the next round. Use Scatter[WorkItem] to annotate the produced type — this renders as a dashed edge in mermaid() diagrams.

@on(Batch)
def split(event: Batch) -> Scatter[WorkItem]:
    return Scatter([WorkItem(item=i) for i in event.items])


@on(WorkItem)
def process(event: WorkItem) -> WorkDone:
    return WorkDone(result=f"done:{event.item}")


@on(WorkDone)
def gather(event: WorkDone, log: EventLog) -> BatchResult | None:
    all_done = log.filter(WorkDone)
    batch = log.latest(Batch)
    if len(all_done) >= len(batch.items):
        return BatchResult(results=tuple(e.result for e in all_done))
    return None  # not all items done yet

Auditable

Marker base class for events that should be auto-logged. Subclass it and subscribe a single @on(Auditable) handler to capture every marked event automatically. The built-in trail() method returns a compact summary of the event's fields.

class TaskStarted(Auditable):
    name: str


@on(Auditable)
def log_event(event: Auditable) -> None:
    print(event.trail())
    # "[TaskStarted] name='deploy'"

MessageEvent

Base class for events that wrap LangChain BaseMessage objects. Declare a message field (single message) or messages field (tuple of messages), and as_messages() auto-converts them. Pairs with message_reducer() for automatic message history accumulation.

from langchain_core.messages import HumanMessage, AIMessage


class UserMessageReceived(MessageEvent, Auditable):
    message: HumanMessage


class LLMResponded(MessageEvent, Auditable):
    message: AIMessage

SystemPromptSet

Built-in MessageEvent that wraps a SystemMessage. Makes the system prompt a first-class citizen in the event log — visible, queryable, and auditable.

from langgraph_events import SystemPromptSet, message_reducer, EventGraph
from langchain_core.messages import SystemMessage

messages = message_reducer()
graph = EventGraph([call_llm, execute_tools], reducers=[messages])

# Convenience factory
log = graph.invoke([
    SystemPromptSet.from_str("You are a helpful assistant with tools."),
    UserMessageReceived(message=HumanMessage(content="What's the weather?")),
])

# Or construct explicitly
seed = SystemPromptSet(message=SystemMessage(content="You are helpful"))

Reducers

When to use reducers: Pure event-driven handlers (log.filter(), log.latest()) are the default and work for most patterns. Add a Reducer when you need incremental accumulation that would be expensive to recompute from the full log each round — the canonical case is message_reducer() for LLM conversation history. Add a ScalarReducer for last-write-wins configuration values injected directly into handlers. If you find yourself calling log.filter(X) and transforming the result the same way in multiple handlers, that's a signal a reducer would help.

A Reducer maps events to contributions for a named LangGraph state channel. The framework maintains the channel incrementally — handlers receive the accumulated value by declaring a parameter whose name matches the reducer.

from langgraph_events import Reducer, ScalarReducer, message_reducer, EventGraph, on

# --- Reducer: accumulates contributions from matching events ---
history = Reducer("history", event_type=UserMsg, fn=lambda e: [e.text], default=[])


@on(UserMsg)
def respond(event: UserMsg, history: list) -> Reply:
    # history contains all projected values so far
    ...


graph = EventGraph([respond], reducers=[history])

# --- message_reducer: built-in for LangChain message accumulation ---
messages = message_reducer()
graph = EventGraph([call_llm, handle_tools], reducers=[messages])
log = graph.invoke([
    SystemPromptSet.from_str("You are a helpful assistant."),
    UserMessageReceived(message=HumanMessage(content="Hi")),
])

# Alternative: explicit default list
messages = message_reducer([SystemMessage(content="You are a helpful assistant.")])

# --- ScalarReducer: last-write-wins, injected as a bare value ---
temperature = ScalarReducer(
    "temperature", event_type=TempSet, fn=lambda e: e.value, default=0.7
)

The parameter name messages matches the reducer name, so the framework injects the accumulated message list automatically:

@on(UserMessageReceived, ToolsExecuted)
async def call_llm(event: Event, messages: list[BaseMessage]) -> LLMResponded:
    response = await llm.ainvoke(messages)
    ...

SKIP

When a ScalarReducer function returns SKIP, the reducer value is left unchanged. This lets handlers opt out of updating the reducer for certain events.

from langgraph_events import SKIP, ScalarReducer

temperature = ScalarReducer(
    "temperature", event_type=ConfigUpdated, fn=lambda e: e.temp if e.temp is not None else SKIP, default=0.7
)

Interrupted / Resumed

Interrupted is a bare marker class — subclass it with domain-specific fields to pause the graph and wait for human input. Resume with graph.resume(event) — the event is auto-dispatched (handlers subscribed to its type fire), then the framework creates a Resumed event alongside it. resume() requires an Event instance; passing a plain string or dict raises TypeError.

Requires a checkpointer (e.g., MemorySaver).

from langgraph.checkpoint.memory import MemorySaver


class OrderConfirmationRequested(Interrupted):
    order_id: str
    total: float


class ApprovalSubmitted(Event):
    approved: bool


@on(OrderPlaced)
def confirm(event: OrderPlaced) -> OrderConfirmationRequested:
    return OrderConfirmationRequested(order_id=event.order_id, total=event.total)


@on(ApprovalSubmitted)
def handle_approval(event: ApprovalSubmitted, log: EventLog) -> OrderConfirmed | OrderCancelled:
    confirm_event = log.latest(OrderConfirmationRequested)
    if event.approved:
        return OrderConfirmed(order_id=confirm_event.order_id)
    return OrderCancelled(reason="User declined")


graph = EventGraph([confirm, handle_approval], checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "order-1"}}

# First call — pauses at the interrupt
graph.invoke(OrderPlaced(order_id="A1", total=99.99), config=config)

# Check state and resume with a typed event
state = graph.get_state(config)
if state.is_interrupted:
    confirm_event = state.interrupted
    print(f"Approve order {confirm_event.order_id} for ${confirm_event.total}?")
log = graph.resume(ApprovalSubmitted(approved=True), config=config)

Streaming

All invoke/stream methods have async counterparts: ainvoke(), astream_events(), aresume(), astream_resume().

# Async stream with real-time LLM token deltas and passthrough custom frames
from langgraph_events import (
    CustomEventFrame,
    LLMStreamEnd,
    LLMToken,
    StateSnapshotFrame,
    emit_state_snapshot,
    emit_custom,
)


@on(SeedEvent)
def step(event: SeedEvent) -> ReplyProduced:
    emit_state_snapshot({"messages": [], "step": "draft"})
    emit_custom("tool.progress", {"pct": 50})
    return ReplyProduced(...)


async for item in graph.astream_events(
    SeedEvent(...),
    include_llm_tokens=True,
    include_custom_events=True,
):
    if isinstance(item, LLMToken):
        print(item.content, end="")
    elif isinstance(item, LLMStreamEnd):
        print("\n[done]", item.message_id)
    elif isinstance(item, StateSnapshotFrame):
        print("snapshot:", item.data)
    elif isinstance(item, CustomEventFrame):
        print("custom:", item.name, item.data)
    else:
        print(item)

Use include_llm_tokens=True for LLM token frames and include_custom_events=True for CustomEventFrame passthrough. Use emit_state_snapshot(data) / await aemit_state_snapshot(data) for typed snapshot frames, and emit_custom(name, data) / await aemit_custom(name, data) for all other stream-only telemetry without importing LangGraph callback APIs directly.